本文主要研究一下storm TridentWindowManager的pendingTriggers

storm的工作节点(聊聊stormTridentWindowManager的pendingTriggers)(1)

TridentBoltExecutor.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) { boolean success = true; try { _bolt.finishBatch(tracked.info); String stream = COORD_STREAM(tracked.info.batchGroup); for(Integer task: tracked.condition.targetTasks) { _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0))); } if(tracked.delayedAck!=null) { _collector.ack(tracked.delayedAck); tracked.delayedAck = null; } } catch(FailedException e) { failBatch(tracked, e); success = false; } _batches.remove(tracked.info.batchId.getId()); return success; }

SubtopologyBolt.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java

public void finishBatch(BatchInfo batchInfo) { for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) { p.finishBatch((ProcessorContext) batchInfo.state); } }

WindowTridentProcessor.finishBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java

public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) { // add tuple to the batch state Object state = processorContext.state[tridentContext.getStateIndex()]; ((List<TridentTuple>) state).add(projection.create(tuple)); } ​ public void finishBatch(ProcessorContext processorContext) { ​ Object batchId = processorContext.batchId; Object batchTxnId = getBatchTxnId(batchId); ​ LOG.debug("Received finishBatch of : [{}] ", batchId); // get all the tuples in a batch and add it to trident-window-manager List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()]; tridentWindowManager.addTuplesBatch(batchId, tuples); ​ List<Integer> pendingTriggerIds = null; List<String> triggerKeys = new ArrayList<>(); Iterable<Object> triggerValues = null; ​ if (retriedAttempt(batchId)) { pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId)); if (pendingTriggerIds != null) { for (Integer pendingTriggerId : pendingTriggerIds) { triggerKeys.add(triggerKey(pendingTriggerId)); } triggerValues = windowStore.get(triggerKeys); } } ​ // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers. if(triggerValues == null) { pendingTriggerIds = new ArrayList<>(); Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers(); LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size()); try { Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator(); List<Object> values = new ArrayList<>(); StoreBasedTridentWindowManager.TriggerResult triggerResult = null; while (pendingTriggersIter.hasNext()) { triggerResult = pendingTriggersIter.next(); for (List<Object> aggregatedResult : triggerResult.result) { String triggerKey = triggerKey(triggerResult.id); triggerKeys.add(triggerKey); values.add(aggregatedResult); pendingTriggerIds.add(triggerResult.id); } pendingTriggersIter.remove(); } triggerValues = values; } finally { // store inprocess triggers of a batch in store for batch retries for any failures if (!pendingTriggerIds.isEmpty()) { windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds); } } } ​ collector.setContext(processorContext); int i = 0; for (Object resultValue : triggerValues) { collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i )), (List<Object>) resultValue)); } collector.setContext(null); }

StoreBasedTridentWindowManager.addTuplesBatch

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java

public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) { LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId); List<WindowsStore.Entry> entries = new ArrayList<>(); for (int i = 0; i < tuples.size(); i ) { String key = keyOf(batchId); TridentTuple tridentTuple = tuples.get(i); entries.add(new WindowsStore.Entry(key i, tridentTuple.select(inputFields))); } ​ // tuples should be available in store before they are added to window manager windowStore.putAll(entries); ​ for (int i = 0; i < tuples.size(); i ) { String key = keyOf(batchId); TridentTuple tridentTuple = tuples.get(i); addToWindowManager(i, key, tridentTuple); } ​ } ​ private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) { TridentTuple actualTuple = null; if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) { actualTuple = tridentTuple; } currentCachedTuplesSize.incrementAndGet(); windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple)); }

WindowManager.add

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

private final ConcurrentLinkedQueue<Event<T>> queue; ​ /** * Add an event into the window, with {@link System#currentTimeMillis()} as * the tracking ts. * * @param event the event to add */ public void add(T event) { add(event, System.currentTimeMillis()); } ​ /** * Add an event into the window, with the given ts as the tracking ts. * * @param event the event to track * @param ts the timestamp */ public void add(T event, long ts) { add(new EventImpl<T>(event, ts)); } ​ /** * Tracks a window event * * @param windowEvent the window event to track */ public void add(Event<T> windowEvent) { // watermark events are not added to the queue. if (!windowEvent.isWatermark()) { queue.add(windowEvent); } else { LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp()); } track(windowEvent); compactWindow(); }

WindowManager.onTrigger

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

/** * The callback invoked by the trigger policy. */ @Override public boolean onTrigger() { List<Event<T>> windowEvents = null; List<T> expired = null; try { lock.lock(); /* * scan the entire window to handle out of order events in * the case of time based windows. */ windowEvents = scanEvents(true); expired = new ArrayList<>(expiredEvents); expiredEvents.clear(); } finally { lock.unlock(); } List<T> events = new ArrayList<>(); List<T> newEvents = new ArrayList<>(); for (Event<T> event : windowEvents) { events.add(event.get()); if (!prevWindowEvents.contains(event)) { newEvents.add(event.get()); } } prevWindowEvents.clear(); if (!events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size()); windowLifecycleListener.onActivation(events, newEvents, expired); } else { LOG.debug("No events in the window, skipping onActivation"); } triggerPolicy.reset(); return !events.isEmpty(); }

WindowManager.scanEvents

storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java

/** * Scan events in the queue, using the expiration policy to check * if the event should be evicted or not. * * @param fullScan if set, will scan the entire queue; if not set, will stop * as soon as an event not satisfying the expiration policy is found * @return the list of events to be processed as a part of the current window */ private List<Event<T>> scanEvents(boolean fullScan) { LOG.debug("Scan events, eviction policy {}", evictionPolicy); List<T> eventsToExpire = new ArrayList<>(); List<Event<T>> eventsToProcess = new ArrayList<>(); try { lock.lock(); Iterator<Event<T>> it = queue.iterator(); while (it.hasNext()) { Event<T> windowEvent = it.next(); Action action = evictionPolicy.evict(windowEvent); if (action == EXPIRE) { eventsToExpire.add(windowEvent.get()); it.remove(); } else if (!fullScan || action == STOP) { break; } else if (action == PROCESS) { eventsToProcess.add(windowEvent); } } expiredEvents.addAll(eventsToExpire); } finally { lock.unlock(); } eventsSinceLastExpiry.set(0); LOG.debug("[{}] events expired from window.", eventsToExpire.size()); if (!eventsToExpire.isEmpty()) { LOG.debug("invoking windowLifecycleListener.onExpiry"); windowLifecycleListener.onExpiry(eventsToExpire); } return eventsToProcess; }

TridentWindowLifeCycleListener.onActivation

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

/** * Listener to reeive any activation/expiry of windowing events and take further action on them. */ class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> { ​ @Override public void onExpiry(List<T> expiredEvents) { LOG.debug("onExpiry is invoked"); onTuplesExpired(expiredEvents); } ​ @Override public void onActivation(List<T> events, List<T> newEvents, List<T> expired) { LOG.debug("onActivation is invoked with events size: [{}]", events.size()); // trigger occurred, create an aggregation and keep them in store int currentTriggerId = triggerId.incrementAndGet(); execAggregatorAndStoreResult(currentTriggerId, events); } } ​ private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) { List<TridentTuple> resultTuples = getTridentTuples(tupleEvents); ​ // run aggregator to compute the result AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector); Object state = aggregator.init(currentTriggerId, collector); for (TridentTuple resultTuple : resultTuples) { aggregator.aggregate(state, resultTuple, collector); } aggregator.complete(state, collector); ​ List<List<Object>> resultantAggregatedValue = collector.values; ​ ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId 1), new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue)); windowStore.putAll(entries); ​ pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue)); }

小结

doc

,