HBase会在如下几种情况下触发flush操作,需要注意的是MemStore的最小flush单元是HRegion而不是单个MemStore可想而知,如果一个HRegion中Memstore过多,每次flush的开销必然会很大,因此建议在进行表设计的时候尽量减少ColumnFamily的个数以及长度触发时机有下面几种情况,下面我们就来聊聊关于mem占用率在哪里?接下来我们就一起去了解一下吧!

mem占用率在哪里(MEMSTORE调优及原理)

mem占用率在哪里

HBase会在如下几种情况下触发flush操作,需要注意的是MemStore的最小flush单元是HRegion而不是单个MemStore。可想而知,如果一个HRegion中Memstore过多,每次flush的开销必然会很大,因此建议在进行表设计的时候尽量减少ColumnFamily的个数以及长度。触发时机有下面几种情况

  1. checkResource
  2. Memstore级别限制
  3. MemStoreFlusher周期性检查
  4. max logs限制
  5. 手动flush
  6. Region合并
  7. Region分裂
  8. Bulk HFile
  9. 做Table的快照

checkResource源码跟踪流程Put -> checkResource -> HReion::flushcache -> internalFlushcache -> internalFlushCacheAndCommitcheckResource是在客户端Put、Delete、Update操作时,执行checkResource操作做如下检查1)Meta Region(HBase系统表,hbase:acl,hbase:meta,hbase:namespace),不做资源约束或者阻塞更新操作。2)如果Region当前内存大小memstoreSize超过阈值blockingMemStoreSize(hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size)则更新阻塞请求计数器,发起刷新MemStore请求,并抛出RegionTooBusyException异常,阻塞数据更新操作。3)触发flush核心代码如下。a. checkResourceprivate void checkResources() throws RegionTooBusyException {// If catalog region, do not impose resource constraints or block updates.if (this.getRegionInfo().isMetaRegion()) return;

if (this.memstoreSize.get() > this.blockingMemStoreSize) { blockedRequestsCount.increment(); requestFlush(); throw new RegionTooBusyException("Above memstore limit, " "regionName=" (this.getRegionInfo() == null ? "unknown" : this.getRegionInfo().getRegionNameAsString()) ", server=" (this.getRegionServerServices() == null ? "unknown" : this.getRegionServerServices().getServerName()) ", memstoreSize=" memstoreSize.get() ", blockingMemStoreSize=" blockingMemStoreSize); } } public long addAndGetGlobalMemstoreSize(long memStoreSize) { if (this.rsAccounting != null) { rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize); } return this.memstoreSize.addAndGet(memStoreSize); }

b. memstoreSize是当前HRegion上MemStore的大小,它是在Put、Append等操作中调用addAndGetGlobalMemstoreSize()方法实时更新的

public long addAndGetGlobalMemstoreSize(long memStoreSize) { if (this.rsAccounting != null) { rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize); } return this.memstoreSize.addAndGet(memStoreSize); }

c. blockingMemStoreSize是HRegion上设定的MemStore的一个阈值,当MemStore的大小超过这个阈值时,将会阻塞数据更新操作。其定义在HRegion上线被构造时需要调用的setHTableSpecificConf()方法中,代码如下void setHTableSpecificConf() {if (this.htableDescriptor == null) return;long flushSize = this.htableDescriptor.getMemStoreFlushSize();

if (flushSize <= 0) { flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); } this.memstoreFlushSize = flushSize; this.blockingMemStoreSize = this.memstoreFlushSize * conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); }

blockingMemStoreSize是hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size,默认4*128M = 512M。

c. 在checkResources方法中调用requestFlush发送rpc flush请求。1、rsServices为HRegionServer提供的服务类。HRegion通过持有它发起执行诸如flush、compact、split等部分操作的工具对象;2、检查writestate的状态,如果writestate的状态为flushRequested,则直接返回,避免重复请求,否则将writestate的flushRequested设置为true,并继续发起flush请求。检查通过后,就会通过rsServices获得Flushrequester,继而调用其requestFlush()方法,将HRegion自身传入,发起flush请求。这个FlushRequester就是HRegionServer上的cacheFlusher,它的requestFlush()就会将flush请求加入到请求队列中,利用内部工作线程去处理。

private void requestFlush() { if (this.rsServices == null) { return; } synchronized (writestate) { if (this.writestate.isFlushRequested()) { return; } writestate.flushRequested = true; } // Make request outside of synchronize block; HBASE-818. this.rsServices.getFlushRequester().requestFlush(this, false); if (LOG.isDebugEnabled()) { LOG.debug("Flush requested on " this.getRegionInfo().getEncodedName()); } }

Memstore级别限制当Region中任意一个MemStore的大小达到了上限(hbase.hregion.memstore.flush.size,默认128MB),会触发Memstore刷新,代码调用HRegion.isFlushSize,打印日志如下。

2017-05-27 11:27:48,017 INFO [MemStoreFlusher.1] regionserver.HRegion: Started memstore flush for TestTable,1490887422950.bab2e928889983bdaa89578ee279a1f4., current region memstore size 128.21 MB 2017-05-27 11:27:48,420 INFO [MemStoreFlusher.1] regionserver.DefaultStoreFlusher: Flushed, sequenceid=9369, memsize=128.2 M, hasBloomFilter=true, into tmp file HDFS://nn209003:9000/dev_hbase/data/default/TestTable/bab2e928889983bdaa89578ee279a1f4/.tmp/2bd6268e6b4e41689777181b0af983f8 2017-05-27 11:27:48,428 INFO [MemStoreFlusher.1] regionserver.HStore: Added hdfs://nn209003:9000/dev_hbase/data/default/TestTable/bab2e928889983bdaa89578ee279a1f4/info/2bd6268e6b4e41689777181b0af983f8, entries=89585, sequenceid=9369, filesize=90.5 M 2017-05-27 11:27:48,429 INFO [MemStoreFlusher.1] regionserver.HRegion: Finished memstore flush of ~128.21 MB/134442016, currentsize=3.82 MB/4007840 for region TestTable,1490887422950.bab2e928889983bdaa89578ee279a1f4. in 412ms, sequenceid=9369, compaction requested=true 可以看到执行完memstore flush之后触发compact

MemStoreFlusher周期性检查定时执行MemStoreFlusher默认周期为1小时,hbase.regionserver.optionalcacheflushinterval(3600000)确保Memstore不会长时间没有持久化。为避免所有的MemStore在同一时间都进行flush导致的问题,定期的flush操作有20000左右的随机延时。设置hbase.regionserver.optionalcacheflushinterval为0禁止自动刷新,具体代码见HRegion#shouldFlush()

boolean shouldFlush() { if(this.completeSequenceId this.flushPerChanges < this.sequenceId.get()) { //防止memstore变化太多 return true; } if (flushCheckInterval <= 0) { //disabled return false; } long now = EnvironmentEdgeManager.currentTimeMillis(); //if we flushed in the recent past, we don’t need to do again now if ((now - getLastFlushTime() < flushCheckInterval)) { //每次flush间隔时间,hbase.regionserver.optionalcacheflushinterval默认为1小时 return false; } //since we didn’t flush in the recent past, flush now if certain conditions //are met. Return true on first such memstore hit. for (Store s : this.getStores().values()) { if (s.timeOfOldestEdit() < now - flushCheckInterval) { // oldest edit of store , one hour ago , now 有store在一小时钱修改过 // we have an old enough edit in the memstore, flush return true; } } return false; }

  1. 依照sequeceid,判断memstore没有太多flush,进行flush
  2. 比较每次flush的间隔时间,没到时间,不进行flush(单从时间上看,memstore够老)
  3. 很明显的写入量很小,因为周期性 flush 线程触发的行为,比如某 store 很久没更新了而最新的 edit 距今超过阈值(默认 1小时),那么就会 delay 一个 random 时间去执行刷新

1)Region Server级别限制:当一个Region Server中所有Memstore的大小总和达到了上限(hbase.regionserver.global.memstore.size*hbase_heapsize,默认40%的JVM内存使用量),会触发部分Memstore刷新。Flush顺序是按照Memstore由大到小执行,先Flush Memstore最大的Region,再执行次大的。直至总体Memstore内存使用量达降到低水位(hbase.regionserver.global.memstore.size.lower.limit * hbase.hregion.memstore.flush.size * hbase_heapsize,默认38%的hbase_heap使用量)开始flush,避免达到hbase.regionserver.global.memstore.size产生锁。计算方式为0.95 * 0.4 * hbase_heapsize = 0.38(hbase_heap)

2)Region级别限制:当Region中所有Memstore的大小总和达到了上限hbase.hregion.memstore.block.multiplier。那么memstore达到hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size,默认4*128M = 512M触发memstore刷新。值不可太大,否则会增大导致整个RS的memstore内存超过hbase.regionserver.global.memstore.size限制的可能性,进而增大阻塞整个RS的写的几率。如果region发生了阻塞会导致大量的线程被阻塞在到该region上,从而其它region的线程数会下降,影响整体的RS服务能力。可以看到hbase.hregion.memstore.block.multiplie配置为2,memstore size 432m 大于256m,因而发生了阻塞。从10分11秒开始阻塞到10分20秒解开,总耗时9秒,在这9秒中无法写入,并且这期间可能会占用大量的RS handler线程,用于其它region或者操作的线程数会逐渐减少,从而影响到整体的性能,也可以通过异步写,并限制写的速度,避免出现阻塞。对于scan操作,注意到hbase.client.scanner.caching为2147483647,是不是可以考虑减小下?1000怎么样?

max logs达到上限当一个Region Server中HLog数量达到上限(可通过参数hbase.regionserver.max.logs配置)时,系统会选取最早的一个 HLog对应的一个或多个Region进行flush,不管memstore是否已经满了。 默认是32个,远远不够的,计算公式如下,原则是hbase.regionserver.hlog.blocksize * hbase.regionserver.maxlogs稍微大hbase.regionserver.global.memstore.lowerLimit * HBASE_HEAPSIZE。其中,hbase.regionserver.hlog.blocksize为 0.95倍HDFS block size(avoid crossing HDFS blocks)对于新的计算方式则为hbase.regionserver.global.memstore.size.lower.limit * hbase.hregion.memstore.flush.size * HBASE_HEAPSIZE在MemStoreFlusher构造函数中可以看到getGlobalMemStoreLowerMark方法调用,其实将旧的参数除以globalMemStorePercent(hbase.regionserver.global.memstore.size),所以对于新的参数就放弃这种做法,直接返回lowMarkPercent。

maxLogs = Math.max( 32, HBASE_HEAP_SIZE * memstoreRatio * 2/ LogRollSize)memstoreRatio = hbase.regionserver.global.memstore.sizeLogRollSize is maximum WAL file size (default 0.95 * HDFS block size)16G hbase_heap按默认配置计算得为53(不够),默认为32是远远不够的对于16GHeap可以设置为100,具体调整可以参考HBASE-14951。在logs为32的情况下看到flush日志如下

2017-05-27 11:23:26,469 INFO [regionserver/dn209002/172.17.209.2:16020.logRoller] wal.FSHLog: Too many wals: logs=33, maxlogs=32; forcing flush of 4 regions(s): db6c43e95cba8242a43e011167024c9f, 3c12e39b30bd7d9d48017b18ce2929b9, b9534207724601f581d7b8eb93cf06cf, 8c5fcd87cd46abe7a5b691b7807a875c 2017-05-27 11:23:26,470 INFO [MemStoreFlusher.0] regionserver.HRegion: Started memstore flush for TraceV2,nx00x00x00x00x00x00x00,1492655062766.db6c43e95cba8242a43e011167024c9f., current region memstore size 2.41 KB 2017-05-27 11:23:26,470 INFO [MemStoreFlusher.1] regionserver.HRegion: Started memstore flush for TraceV2,xCAx00x00x00x00x00x00x00,1492655062766.3c12e39b30bd7d9d48017b18ce2929b9., current region memstore size 688 B 2017-05-27 11:23:26,476 INFO [MemStoreFlusher.1] regionserver.DefaultStoreFlusher: Flushed, sequenceid=67, memsize=688, hasBloomFilter=true, into tmp file hdfs://nn209003:9000/dev_hbase/data/default/TraceV2/3c12e39b30bd7d9d48017b18ce2929b9/.tmp/b3ebdf4944d1498d90f38e8295d1ae22 2017-05-27 11:23:26,483 INFO [MemStoreFlusher.1] regionserver.HStore: Added hdfs://nn209003:9000/dev_hbase/data/default/TraceV2/3c12e39b30bd7d9d48017b18ce2929b9/S/b3ebdf4944d1498d90f38e8295d1ae22, entries=1, sequenceid=67, filesize=5.6 K 2017-05-27 11:23:26,484 INFO [MemStoreFlusher.1] regionserver.HRegion: Finished memstore flush of ~688 B/688, currentsize=0 B/0 for region TraceV2,xCAx00x00x00x00x00x00x00,1492655062766.3c12e39b30bd7d9d48017b18ce2929b9. in 14ms, sequenceid=67, compaction requested=false

手动flush手动执行flush:用户可以通过shell命令 flush ‘tablename’或flush ‘region name’分别对一个表或者一个Region进行flush。RegionServer上RSRpcServices的flushRegion()方法发起的。flushRegion()的代码如下

/** * Flush a region on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority=HConstants.ADMIN_QOS) public FlushRegionResponse flushRegion(final RpcController controller, final FlushRegionRequest request) throws ServiceException { try { checkOpen(); requestCount.increment(); Region region = getRegion(request.getRegion()); [LOG.info](http://log.info/)("Flushing " region.getRegionInfo().getRegionNameAsString()); boolean shouldFlush = true; if (request.hasIfOlderThanTs()) { shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); } FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); if (shouldFlush) { boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; long startTime = EnvironmentEdgeManager.currentTime(); // Go behind the curtain so we can manage writing of the flush WAL marker HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl) ((HRegion)region).flushcache(true, writeFlushWalMarker); if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); } boolean compactionNeeded = flushResult.isCompactionNeeded(); if (compactionNeeded) { regionServer.compactSplitThread.requestSystemCompaction(region, “Compaction through user triggered flush”); } builder.setFlushed(flushResult.isFlushSucceeded()); builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); } builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); return builder.build(); } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical // section, we get a DroppedSnapshotException and a replay of wal // is required. Currently the only way to do this is a restart of // the server. regionServer.abort(“Replay of WAL required. Forcing server shutdown”, ex); throw new ServiceException(ex); } catch (IOException ie) { throw new ServiceException(ie); } }

Region合并Region合并但不是compact,而是两个Region的merge。在RSRpcServices中存在Region合并时调用的mergeRegions()方法,在其内部会先后调用regionA和regionB的flushcache()方法去flush每个Region上的MemStore,然后再执行Region合并

/** * Merge regions on the region server. * * @param controller the RPC controller * @param request the request * @return merge regions response * @throws ServiceException */ @Override @QosPriority(priority = HConstants.ADMIN_QOS) public MergeRegionsResponse mergeRegions(final RpcController controller, final MergeRegionsRequest request) throws ServiceException { try { checkOpen(); requestCount.increment(); Region regionA = getRegion(request.getRegionA()); Region regionB = getRegion(request.getRegionB()); boolean forcible = request.getForcible(); long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; regionA.startRegionOperation(Operation.MERGE_REGION); regionB.startRegionOperation(Operation.MERGE_REGION); if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { throw new ServiceException(new MergeRegionException(“Can’t merge non-default replicas”)); } [LOG.info](http://log.info/)("Receiving merging request for " regionA ", " regionB “,forcible=” forcible); long startTime = EnvironmentEdgeManager.currentTime(); FlushResult flushResult = regionA.flush(true); if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); } startTime = EnvironmentEdgeManager.currentTime(); flushResult = regionB.flush(true); if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); } regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, masterSystemTime, RpcServer.getRequestUser()); return MergeRegionsResponse.newBuilder().build(); } catch (DroppedSnapshotException ex) { regionServer.abort(“Replay of WAL required. Forcing server shutdown”, ex); throw new ServiceException(ex); } catch (IOException ie) { throw new ServiceException(ie); } }

Region分裂在RSRpcServices中存在Region分裂时调用的splitRegion()方法,也是先调用flushcache()将Region上的memstore刷新。

/** * Split a region on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority=HConstants.ADMIN_QOS) public SplitRegionResponse splitRegion(final RpcController controller, final SplitRegionRequest request) throws ServiceException { try { checkOpen(); requestCount.increment(); Region region = getRegion(request.getRegion()); region.startRegionOperation(Operation.SPLIT_REGION); if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { throw new IOException("Can’t split replicas directly. " “Replicas are auto-split when their primary is split.”); } [LOG.info](http://log.info/)("Splitting " region.getRegionInfo().getRegionNameAsString()); long startTime = EnvironmentEdgeManager.currentTime(); FlushResult flushResult = region.flush(true); if (flushResult.isFlushSucceeded()) { long endTime = EnvironmentEdgeManager.currentTime(); regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); } byte[] splitPoint = null; if (request.hasSplitPoint()) { splitPoint = request.getSplitPoint().toByteArray(); } ((HRegion)region).forceSplit(splitPoint); regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(), RpcServer.getRequestUser()); return SplitRegionResponse.newBuilder().build(); } catch (DroppedSnapshotException ex) { regionServer.abort(“Replay of WAL required. Forcing server shutdown”, ex); throw new ServiceException(ex); } catch (IOException ie) { throw new ServiceException(ie); } }

因此,严重关切Memstore的大小和Memstore Flush Queue的大小。理想情况下,Memstore的大小不应该达到hbase.regionserver.global.memstore.size(hbase.regionserver.global.memstore.upperLimit)的设置,Memstore Flush Queue 的size不能持续增长。

参考[1] http://blog.csdn.net/lipeng_bigdata/article/details/50772150[2] http://blog.csdn.net/lipeng_bigdata/article/details/50769043[3] http://blog.csdn.net/lipeng_bigdata/article/details/50765008[4] https://community.hortonworks.com/articles/52616/hbase-compaction-tuning-tips.html[5] https://sematext.com/blog/2012/07/16/hbase-memstore-what-you-should-know/[6] http://hbasefly.com/2016/07/13/hbase-compaction-1/

文章来源:https://oomspot.com/post/hbase-memstore-tuning

欢迎访问 开发者的技术家园 - OomSpot

,