HBase会在如下几种情况下触发flush操作,需要注意的是MemStore的最小flush单元是HRegion而不是单个MemStore可想而知,如果一个HRegion中Memstore过多,每次flush的开销必然会很大,因此建议在进行表设计的时候尽量减少ColumnFamily的个数以及长度触发时机有下面几种情况,下面我们就来聊聊关于mem占用率在哪里?接下来我们就一起去了解一下吧!
mem占用率在哪里
HBase会在如下几种情况下触发flush操作,需要注意的是MemStore的最小flush单元是HRegion而不是单个MemStore。可想而知,如果一个HRegion中Memstore过多,每次flush的开销必然会很大,因此建议在进行表设计的时候尽量减少ColumnFamily的个数以及长度。触发时机有下面几种情况
- checkResource
- Memstore级别限制
- MemStoreFlusher周期性检查
- max logs限制
- 手动flush
- Region合并
- Region分裂
- Bulk HFile
- 做Table的快照
checkResource源码跟踪流程Put -> checkResource -> HReion::flushcache -> internalFlushcache -> internalFlushCacheAndCommitcheckResource是在客户端Put、Delete、Update操作时,执行checkResource操作做如下检查1)Meta Region(HBase系统表,hbase:acl,hbase:meta,hbase:namespace),不做资源约束或者阻塞更新操作。2)如果Region当前内存大小memstoreSize超过阈值blockingMemStoreSize(hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size)则更新阻塞请求计数器,发起刷新MemStore请求,并抛出RegionTooBusyException异常,阻塞数据更新操作。3)触发flush核心代码如下。a. checkResourceprivate void checkResources() throws RegionTooBusyException {// If catalog region, do not impose resource constraints or block updates.if (this.getRegionInfo().isMetaRegion()) return;
if (this.memstoreSize.get() > this.blockingMemStoreSize) {
blockedRequestsCount.increment();
requestFlush();
throw new RegionTooBusyException("Above memstore limit, "
"regionName=" (this.getRegionInfo() == null ? "unknown" :
this.getRegionInfo().getRegionNameAsString())
", server=" (this.getRegionServerServices() == null ? "unknown" :
this.getRegionServerServices().getServerName())
", memstoreSize=" memstoreSize.get()
", blockingMemStoreSize=" blockingMemStoreSize);
}
} public long addAndGetGlobalMemstoreSize(long memStoreSize) {
if (this.rsAccounting != null) {
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
}
return this.memstoreSize.addAndGet(memStoreSize);
}
b. memstoreSize是当前HRegion上MemStore的大小,它是在Put、Append等操作中调用addAndGetGlobalMemstoreSize()方法实时更新的
public long addAndGetGlobalMemstoreSize(long memStoreSize) {
if (this.rsAccounting != null) {
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
}
return this.memstoreSize.addAndGet(memStoreSize);
}
c. blockingMemStoreSize是HRegion上设定的MemStore的一个阈值,当MemStore的大小超过这个阈值时,将会阻塞数据更新操作。其定义在HRegion上线被构造时需要调用的setHTableSpecificConf()方法中,代码如下void setHTableSpecificConf() {if (this.htableDescriptor == null) return;long flushSize = this.htableDescriptor.getMemStoreFlushSize();
if (flushSize <= 0) {
flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
}
this.memstoreFlushSize = flushSize;
this.blockingMemStoreSize = this.memstoreFlushSize *
conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
}
blockingMemStoreSize是hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size,默认4*128M = 512M。
c. 在checkResources方法中调用requestFlush发送rpc flush请求。1、rsServices为HRegionServer提供的服务类。HRegion通过持有它发起执行诸如flush、compact、split等部分操作的工具对象;2、检查writestate的状态,如果writestate的状态为flushRequested,则直接返回,避免重复请求,否则将writestate的flushRequested设置为true,并继续发起flush请求。检查通过后,就会通过rsServices获得Flushrequester,继而调用其requestFlush()方法,将HRegion自身传入,发起flush请求。这个FlushRequester就是HRegionServer上的cacheFlusher,它的requestFlush()就会将flush请求加入到请求队列中,利用内部工作线程去处理。
private void requestFlush() {
if (this.rsServices == null) {
return;
}
synchronized (writestate) {
if (this.writestate.isFlushRequested()) {
return;
}
writestate.flushRequested = true;
}
// Make request outside of synchronize block; HBASE-818.
this.rsServices.getFlushRequester().requestFlush(this, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " this.getRegionInfo().getEncodedName());
}
}
Memstore级别限制当Region中任意一个MemStore的大小达到了上限(hbase.hregion.memstore.flush.size,默认128MB),会触发Memstore刷新,代码调用HRegion.isFlushSize,打印日志如下。
2017-05-27 11:27:48,017 INFO [MemStoreFlusher.1] regionserver.HRegion: Started memstore flush for TestTable,1490887422950.bab2e928889983bdaa89578ee279a1f4., current region memstore size 128.21 MB
2017-05-27 11:27:48,420 INFO [MemStoreFlusher.1] regionserver.DefaultStoreFlusher: Flushed, sequenceid=9369, memsize=128.2 M, hasBloomFilter=true, into tmp file HDFS://nn209003:9000/dev_hbase/data/default/TestTable/bab2e928889983bdaa89578ee279a1f4/.tmp/2bd6268e6b4e41689777181b0af983f8
2017-05-27 11:27:48,428 INFO [MemStoreFlusher.1] regionserver.HStore: Added hdfs://nn209003:9000/dev_hbase/data/default/TestTable/bab2e928889983bdaa89578ee279a1f4/info/2bd6268e6b4e41689777181b0af983f8, entries=89585, sequenceid=9369, filesize=90.5 M
2017-05-27 11:27:48,429 INFO [MemStoreFlusher.1] regionserver.HRegion: Finished memstore flush of ~128.21 MB/134442016, currentsize=3.82 MB/4007840 for region TestTable,1490887422950.bab2e928889983bdaa89578ee279a1f4. in 412ms, sequenceid=9369, compaction requested=true
可以看到执行完memstore flush之后触发compact
MemStoreFlusher周期性检查定时执行MemStoreFlusher默认周期为1小时,hbase.regionserver.optionalcacheflushinterval(3600000)确保Memstore不会长时间没有持久化。为避免所有的MemStore在同一时间都进行flush导致的问题,定期的flush操作有20000左右的随机延时。设置hbase.regionserver.optionalcacheflushinterval为0禁止自动刷新,具体代码见HRegion#shouldFlush()
boolean shouldFlush() {
if(this.completeSequenceId this.flushPerChanges < this.sequenceId.get()) {
//防止memstore变化太多
return true;
}
if (flushCheckInterval <= 0) { //disabled
return false;
}
long now = EnvironmentEdgeManager.currentTimeMillis();
//if we flushed in the recent past, we don’t need to do again now
if ((now - getLastFlushTime() < flushCheckInterval)) {
//每次flush间隔时间,hbase.regionserver.optionalcacheflushinterval默认为1小时
return false;
}
//since we didn’t flush in the recent past, flush now if certain conditions
//are met. Return true on first such memstore hit.
for (Store s : this.getStores().values()) {
if (s.timeOfOldestEdit() < now - flushCheckInterval) {
// oldest edit of store , one hour ago , now 有store在一小时钱修改过
// we have an old enough edit in the memstore, flush
return true;
}
}
return false;
}
- 依照sequeceid,判断memstore没有太多flush,进行flush
- 比较每次flush的间隔时间,没到时间,不进行flush(单从时间上看,memstore够老)
- 很明显的写入量很小,因为周期性 flush 线程触发的行为,比如某 store 很久没更新了而最新的 edit 距今超过阈值(默认 1小时),那么就会 delay 一个 random 时间去执行刷新
1)Region Server级别限制:当一个Region Server中所有Memstore的大小总和达到了上限(hbase.regionserver.global.memstore.size*hbase_heapsize,默认40%的JVM内存使用量),会触发部分Memstore刷新。Flush顺序是按照Memstore由大到小执行,先Flush Memstore最大的Region,再执行次大的。直至总体Memstore内存使用量达降到低水位(hbase.regionserver.global.memstore.size.lower.limit * hbase.hregion.memstore.flush.size * hbase_heapsize,默认38%的hbase_heap使用量)开始flush,避免达到hbase.regionserver.global.memstore.size产生锁。计算方式为0.95 * 0.4 * hbase_heapsize = 0.38(hbase_heap)
2)Region级别限制:当Region中所有Memstore的大小总和达到了上限hbase.hregion.memstore.block.multiplier。那么memstore达到hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size,默认4*128M = 512M触发memstore刷新。值不可太大,否则会增大导致整个RS的memstore内存超过hbase.regionserver.global.memstore.size限制的可能性,进而增大阻塞整个RS的写的几率。如果region发生了阻塞会导致大量的线程被阻塞在到该region上,从而其它region的线程数会下降,影响整体的RS服务能力。可以看到hbase.hregion.memstore.block.multiplie配置为2,memstore size 432m 大于256m,因而发生了阻塞。从10分11秒开始阻塞到10分20秒解开,总耗时9秒,在这9秒中无法写入,并且这期间可能会占用大量的RS handler线程,用于其它region或者操作的线程数会逐渐减少,从而影响到整体的性能,也可以通过异步写,并限制写的速度,避免出现阻塞。对于scan操作,注意到hbase.client.scanner.caching为2147483647,是不是可以考虑减小下?1000怎么样?
max logs达到上限当一个Region Server中HLog数量达到上限(可通过参数hbase.regionserver.max.logs配置)时,系统会选取最早的一个 HLog对应的一个或多个Region进行flush,不管memstore是否已经满了。 默认是32个,远远不够的,计算公式如下,原则是hbase.regionserver.hlog.blocksize * hbase.regionserver.maxlogs稍微大hbase.regionserver.global.memstore.lowerLimit * HBASE_HEAPSIZE。其中,hbase.regionserver.hlog.blocksize为 0.95倍HDFS block size(avoid crossing HDFS blocks)对于新的计算方式则为hbase.regionserver.global.memstore.size.lower.limit * hbase.hregion.memstore.flush.size * HBASE_HEAPSIZE在MemStoreFlusher构造函数中可以看到getGlobalMemStoreLowerMark方法调用,其实将旧的参数除以globalMemStorePercent(hbase.regionserver.global.memstore.size),所以对于新的参数就放弃这种做法,直接返回lowMarkPercent。
maxLogs = Math.max( 32, HBASE_HEAP_SIZE * memstoreRatio * 2/ LogRollSize)memstoreRatio = hbase.regionserver.global.memstore.sizeLogRollSize is maximum WAL file size (default 0.95 * HDFS block size)16G hbase_heap按默认配置计算得为53(不够),默认为32是远远不够的对于16GHeap可以设置为100,具体调整可以参考HBASE-14951。在logs为32的情况下看到flush日志如下
2017-05-27 11:23:26,469 INFO [regionserver/dn209002/172.17.209.2:16020.logRoller] wal.FSHLog: Too many wals: logs=33, maxlogs=32; forcing flush of 4 regions(s): db6c43e95cba8242a43e011167024c9f, 3c12e39b30bd7d9d48017b18ce2929b9, b9534207724601f581d7b8eb93cf06cf, 8c5fcd87cd46abe7a5b691b7807a875c
2017-05-27 11:23:26,470 INFO [MemStoreFlusher.0] regionserver.HRegion: Started memstore flush for TraceV2,nx00x00x00x00x00x00x00,1492655062766.db6c43e95cba8242a43e011167024c9f., current region memstore size 2.41 KB
2017-05-27 11:23:26,470 INFO [MemStoreFlusher.1] regionserver.HRegion: Started memstore flush for TraceV2,xCAx00x00x00x00x00x00x00,1492655062766.3c12e39b30bd7d9d48017b18ce2929b9., current region memstore size 688 B
2017-05-27 11:23:26,476 INFO [MemStoreFlusher.1] regionserver.DefaultStoreFlusher: Flushed, sequenceid=67, memsize=688, hasBloomFilter=true, into tmp file hdfs://nn209003:9000/dev_hbase/data/default/TraceV2/3c12e39b30bd7d9d48017b18ce2929b9/.tmp/b3ebdf4944d1498d90f38e8295d1ae22
2017-05-27 11:23:26,483 INFO [MemStoreFlusher.1] regionserver.HStore: Added hdfs://nn209003:9000/dev_hbase/data/default/TraceV2/3c12e39b30bd7d9d48017b18ce2929b9/S/b3ebdf4944d1498d90f38e8295d1ae22, entries=1, sequenceid=67, filesize=5.6 K
2017-05-27 11:23:26,484 INFO [MemStoreFlusher.1] regionserver.HRegion: Finished memstore flush of ~688 B/688, currentsize=0 B/0 for region TraceV2,xCAx00x00x00x00x00x00x00,1492655062766.3c12e39b30bd7d9d48017b18ce2929b9. in 14ms, sequenceid=67, compaction requested=false
手动flush手动执行flush:用户可以通过shell命令 flush ‘tablename’或flush ‘region name’分别对一个表或者一个Region进行flush。RegionServer上RSRpcServices的flushRegion()方法发起的。flushRegion()的代码如下
/**
* Flush a region on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public FlushRegionResponse flushRegion(final RpcController controller,
final FlushRegionRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
Region region = getRegion(request.getRegion());
[LOG.info](http://log.info/)("Flushing " region.getRegionInfo().getRegionNameAsString());
boolean shouldFlush = true;
if (request.hasIfOlderThanTs()) {
shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
}
FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
if (shouldFlush) {
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
request.getWriteFlushWalMarker() : false;
long startTime = EnvironmentEdgeManager.currentTime();
// Go behind the curtain so we can manage writing of the flush WAL marker
HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
((HRegion)region).flushcache(true, writeFlushWalMarker);
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) {
regionServer.compactSplitThread.requestSystemCompaction(region,
“Compaction through user triggered flush”);
}
builder.setFlushed(flushResult.isFlushSucceeded());
builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
}
builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
return builder.build();
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of wal
// is required. Currently the only way to do this is a restart of
// the server.
regionServer.abort(“Replay of WAL required. Forcing server shutdown”, ex);
throw new ServiceException(ex);
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
Region合并Region合并但不是compact,而是两个Region的merge。在RSRpcServices中存在Region合并时调用的mergeRegions()方法,在其内部会先后调用regionA和regionB的flushcache()方法去flush每个Region上的MemStore,然后再执行Region合并
/**
* Merge regions on the region server.
*
* @param controller the RPC controller
* @param request the request
* @return merge regions response
* @throws ServiceException
*/
@Override
@QosPriority(priority = HConstants.ADMIN_QOS)
public MergeRegionsResponse mergeRegions(final RpcController controller,
final MergeRegionsRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
Region regionA = getRegion(request.getRegionA());
Region regionB = getRegion(request.getRegionB());
boolean forcible = request.getForcible();
long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
regionA.startRegionOperation(Operation.MERGE_REGION);
regionB.startRegionOperation(Operation.MERGE_REGION);
if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
throw new ServiceException(new MergeRegionException(“Can’t merge non-default replicas”));
}
[LOG.info](http://log.info/)("Receiving merging request for " regionA ", " regionB
“,forcible=” forcible);
long startTime = EnvironmentEdgeManager.currentTime();
FlushResult flushResult = regionA.flush(true);
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
startTime = EnvironmentEdgeManager.currentTime();
flushResult = regionB.flush(true);
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
masterSystemTime, RpcServer.getRequestUser());
return MergeRegionsResponse.newBuilder().build();
} catch (DroppedSnapshotException ex) {
regionServer.abort(“Replay of WAL required. Forcing server shutdown”, ex);
throw new ServiceException(ex);
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
Region分裂在RSRpcServices中存在Region分裂时调用的splitRegion()方法,也是先调用flushcache()将Region上的memstore刷新。
/**
* Split a region on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public SplitRegionResponse splitRegion(final RpcController controller,
final SplitRegionRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
Region region = getRegion(request.getRegion());
region.startRegionOperation(Operation.SPLIT_REGION);
if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
throw new IOException("Can’t split replicas directly. "
“Replicas are auto-split when their primary is split.”);
}
[LOG.info](http://log.info/)("Splitting " region.getRegionInfo().getRegionNameAsString());
long startTime = EnvironmentEdgeManager.currentTime();
FlushResult flushResult = region.flush(true);
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
byte[] splitPoint = null;
if (request.hasSplitPoint()) {
splitPoint = request.getSplitPoint().toByteArray();
}
((HRegion)region).forceSplit(splitPoint);
regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
RpcServer.getRequestUser());
return SplitRegionResponse.newBuilder().build();
} catch (DroppedSnapshotException ex) {
regionServer.abort(“Replay of WAL required. Forcing server shutdown”, ex);
throw new ServiceException(ex);
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
因此,严重关切Memstore的大小和Memstore Flush Queue的大小。理想情况下,Memstore的大小不应该达到hbase.regionserver.global.memstore.size(hbase.regionserver.global.memstore.upperLimit)的设置,Memstore Flush Queue 的size不能持续增长。
参考[1] http://blog.csdn.net/lipeng_bigdata/article/details/50772150[2] http://blog.csdn.net/lipeng_bigdata/article/details/50769043[3] http://blog.csdn.net/lipeng_bigdata/article/details/50765008[4] https://community.hortonworks.com/articles/52616/hbase-compaction-tuning-tips.html[5] https://sematext.com/blog/2012/07/16/hbase-memstore-what-you-should-know/[6] http://hbasefly.com/2016/07/13/hbase-compaction-1/
文章来源:https://oomspot.com/post/hbase-memstore-tuning
欢迎访问 开发者的技术家园 - OomSpot
,