博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm的reportError
阅读量:7010 次
发布时间:2019-06-28

本文共 26571 字,大约阅读时间需要 88 分钟。

  hot3.png

本文主要研究一下storm的reportError

IErrorReporter

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java

public interface IErrorReporter {    void reportError(Throwable error);}
  • ISpoutOutputCollector、IOutputCollector、IBasicOutputCollector接口均继承了IErrorReporter接口

ISpoutOutputCollector

storm-core/1.2.2/storm-core-1.2.2-sources.jar!/org/apache/storm/spout/ISpoutOutputCollector.java

public interface ISpoutOutputCollector extends IErrorReporter{    /**        Returns the task ids that received the tuples.    */    List
emit(String streamId, List
tuple, Object messageId); void emitDirect(int taskId, String streamId, List tuple, Object messageId); long getPendingCount();}
  • ISpoutOutputCollector的实现类有SpoutOutputCollector、SpoutOutputCollectorImpl等

IOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java

public interface IOutputCollector extends IErrorReporter {    /**     * Returns the task ids that received the tuples.     */    List
emit(String streamId, Collection
anchors, List
tuple); void emitDirect(int taskId, String streamId, Collection
anchors, List
tuple); void ack(Tuple input); void fail(Tuple input); void resetTimeout(Tuple input); void flush();}
  • IOutputCollector的实现类有OutputCollector、BoltOutputCollectorImpl等

IBasicOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java

public interface IBasicOutputCollector extends IErrorReporter {    List
emit(String streamId, List
tuple); void emitDirect(int taskId, String streamId, List tuple); void resetTimeout(Tuple tuple);}
  • IBasicOutputCollector的实现类有BasicOutputCollector

reportError

SpoutOutputCollectorImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

@Override    public void reportError(Throwable error) {        executor.getErrorReportingMetrics().incrReportedErrorCount();        executor.getReportError().report(error);    }

BoltOutputCollectorImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java

@Override    public void reportError(Throwable error) {        executor.getErrorReportingMetrics().incrReportedErrorCount();        executor.getReportError().report(error);    }

可以看到SpoutOutputCollectorImpl及BoltOutputCollectorImpl的reportError方法,均调用了executor.getReportError().report(error);

ReportError.report

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java

public class ReportError implements IReportError {    private static final Logger LOG = LoggerFactory.getLogger(ReportError.class);    private final Map
topoConf; private final IStormClusterState stormClusterState; private final String stormId; private final String componentId; private final WorkerTopologyContext workerTopologyContext; private int maxPerInterval; private int errorIntervalSecs; private AtomicInteger intervalStartTime; private AtomicInteger intervalErrors; public ReportError(Map
topoConf, IStormClusterState stormClusterState, String stormId, String componentId, WorkerTopologyContext workerTopologyContext) { this.topoConf = topoConf; this.stormClusterState = stormClusterState; this.stormId = stormId; this.componentId = componentId; this.workerTopologyContext = workerTopologyContext; this.errorIntervalSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS)); this.maxPerInterval = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL)); this.intervalStartTime = new AtomicInteger(Time.currentTimeSecs()); this.intervalErrors = new AtomicInteger(0); } @Override public void report(Throwable error) { LOG.error("Error", error); if (Time.deltaSecs(intervalStartTime.get()) > errorIntervalSecs) { intervalErrors.set(0); intervalStartTime.set(Time.currentTimeSecs()); } if (intervalErrors.incrementAndGet() <= maxPerInterval) { try { stormClusterState.reportError(stormId, componentId, Utils.hostname(), workerTopologyContext.getThisWorkerPort().longValue(), error); } catch (UnknownHostException e) { throw Utils.wrapInRuntime(e); } } }}
  • 可以看到这里先判断interval是否需要重置,然后再判断error是否超过interval的最大次数,没有超过的话,则调用stormClusterState.reportError写入到存储,比如zk

StormClusterStateImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java

@Override    public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {        String path = ClusterUtils.errorPath(stormId, componentId);        String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());        errorInfo.set_host(node);        errorInfo.set_port(port.intValue());        byte[] serData = Utils.serialize(errorInfo);        stateStorage.mkdirs(path, defaultAcls);        stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, defaultAcls);        stateStorage.set_data(lastErrorPath, serData, defaultAcls);        List
childrens = stateStorage.get_children(path, false); Collections.sort(childrens, new Comparator
() { public int compare(String arg0, String arg1) { return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1))); } }); while (childrens.size() > 10) { String znodePath = path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0); try { stateStorage.delete_node(znodePath); } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) { // if the node is already deleted, do nothing LOG.warn("Could not find the znode: {}", znodePath); } else { throw e; } } } }
  • 这里使用ClusterUtils.errorPath(stormId, componentId)获取写入的目录,再通过ClusterUtils.lastErrorPath(stormId, componentId)获取写入的路径
  • 由于zk不适合存储大量数据,因而这里会判断如果childrens超过10的时候,会删除多余的节点,这里先按照节点名substring(1)升序排序,然后挨个删除

ClusterUtils.errorPath

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java

public static final String ZK_SEPERATOR = "/";    public static final String ERRORS_ROOT = "errors";    public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;    public static String errorPath(String stormId, String componentId) {        try {            return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, "UTF-8");        } catch (UnsupportedEncodingException e) {            throw Utils.wrapInRuntime(e);        }    }    public static String lastErrorPath(String stormId, String componentId) {        return errorPath(stormId, componentId) + "-last-error";    }    public static String errorStormRoot(String stormId) {        return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;    }
  • errorPath的路径为/errors/{stormId}/{componentId},该目录下创建了以e开头的EPHEMERAL_SEQUENTIAL节点,error信息首先追加到该目录下,然后再判断如果超过10个则删除旧的节点
  • lastErrorPath的路径为/errors/{stormId}/{componentId}-last-error,用于存储该componentId的最后一个error

zkCli查看

[zk: localhost:2181(CONNECTED) 21] ls /storm/errors[DRPCStateQuery-1-1540185943, reportErrorDemo-1-1540260375][zk: localhost:2181(CONNECTED) 22] ls /storm/errors/reportErrorDemo-1-1540260375[print, print-last-error][zk: localhost:2181(CONNECTED) 23] ls /storm/errors/reportErrorDemo-1-1540260375/print[e0000000291, e0000000290, e0000000295, e0000000294, e0000000293, e0000000292, e0000000299, e0000000298, e0000000297, e0000000296][zk: localhost:2181(CONNECTED) 24] ls /storm/errors/reportErrorDemo-1-1540260375/print/e0000000299[][zk: localhost:2181(CONNECTED) 25] ls /storm/errors/reportErrorDemo-1-1540260375/print-last-error[]

storm-ui

curl -i http://192.168.99.100:8080/api/v1/topology/reportErrorDemo-1-1540260375?sys=false
  • storm-ui请求了如上的接口,获取了topology相关的数据,其中spout或bolt中包括了lastError,展示了最近一个的error信息

StormApiResource

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java

/**     * /api/v1/topology -> topo.     */    @GET    @Path("/topology/{id}")    @AuthNimbusOp(value = "getTopology", needsTopoId = true)    @Produces("application/json")    public Response getTopology(@PathParam("id") String id,                                @DefaultValue(":all-time") @QueryParam("window") String window,                                @QueryParam("sys") boolean sys,                                @QueryParam(callbackParameterName) String callback) throws TException {        topologyPageRequestMeter.mark();        try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(config)) {            return UIHelpers.makeStandardResponse(                    UIHelpers.getTopologySummary(                            nimbusClient.getClient().getTopologyPageInfo(id, window, sys),                            window, config,                            servletRequest.getRemoteUser()                    ),                    callback            );        }    }
  • 这里调用了nimbusClient.getClient().getTopologyPageInfo(id, window, sys)方法

Nimbus.getTopologyPageInfo

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

@Override    public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolean includeSys)        throws NotAliveException, AuthorizationException, TException {        try {            getTopologyPageInfoCalls.mark();            CommonTopoInfo common = getCommonTopoInfo(topoId, "getTopologyPageInfo");            String topoName = common.topoName;            IStormClusterState state = stormClusterState;            int launchTimeSecs = common.launchTimeSecs;            Assignment assignment = common.assignment;            Map
, Map
> beats = common.beats; Map
taskToComp = common.taskToComponent; StormTopology topology = common.topology; Map
topoConf = Utils.merge(conf, common.topoConf); StormBase base = common.base; if (base == null) { throw new WrappedNotAliveException(topoId); } Map
workerToResources = getWorkerResourcesForTopology(topoId); List
workerSummaries = null; Map
, List
> exec2NodePort = new HashMap<>(); if (assignment != null) { Map
, NodeInfo> execToNodeInfo = assignment.get_executor_node_port(); Map
nodeToHost = assignment.get_node_host(); for (Entry
, NodeInfo> entry : execToNodeInfo.entrySet()) { NodeInfo ni = entry.getValue(); List
nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next()); exec2NodePort.put(entry.getKey(), nodePort); } workerSummaries = StatsUtil.aggWorkerStats(topoId, topoName, taskToComp, beats, exec2NodePort, nodeToHost, workerToResources, includeSys, true); //this is the topology page, so we know the user is authorized } TopologyPageInfo topoPageInfo = StatsUtil.aggTopoExecsStats(topoId, exec2NodePort, taskToComp, beats, topology, window, includeSys, state); //...... return topoPageInfo; } catch (Exception e) { LOG.warn("Get topo page info exception. (topology id='{}')", topoId, e); if (e instanceof TException) { throw (TException) e; } throw new RuntimeException(e); } }
  • 这里调用了StatsUtil.aggTopoExecsStats来获取TopologyPageInfo

StatsUtil.aggTopoExecsStats

storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java

/**     * aggregate topo executors stats.     *     * @param topologyId     topology id     * @param exec2nodePort  executor -> host+port     * @param task2component task -> component     * @param beats          executor[start, end] -> executor heartbeat     * @param topology       storm topology     * @param window         the window to be aggregated     * @param includeSys     whether to include system streams     * @param clusterState   cluster state     * @return TopologyPageInfo thrift structure     */    public static TopologyPageInfo aggTopoExecsStats(        String topologyId, Map exec2nodePort, Map task2component, Map
, Map
> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) { List
> beatList = extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology); Map
topoStats = aggregateTopoStats(window, includeSys, beatList); return postAggregateTopoStats(task2component, exec2nodePort, topoStats, topologyId, clusterState); }
  • StatsUtil.aggTopoExecsStats方法最后调用了postAggregateTopoStats方法

StatsUtil.postAggregateTopoStats

storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java

private static TopologyPageInfo postAggregateTopoStats(Map task2comp, Map exec2nodePort, Map
accData, String topologyId, IStormClusterState clusterState) { TopologyPageInfo ret = new TopologyPageInfo(topologyId); ret.set_num_tasks(task2comp.size()); ret.set_num_workers(((Set) accData.get(WORKERS_SET)).size()); ret.set_num_executors(exec2nodePort != null ? exec2nodePort.size() : 0); Map bolt2stats = ClientStatsUtil.getMapByKey(accData, BOLT_TO_STATS); Map
aggBolt2stats = new HashMap<>(); for (Object o : bolt2stats.entrySet()) { Map.Entry e = (Map.Entry) o; Map m = (Map) e.getValue(); long executed = getByKeyOr0(m, EXECUTED).longValue(); if (executed > 0) { double execLatencyTotal = getByKeyOr0(m, EXEC_LAT_TOTAL).doubleValue(); m.put(EXEC_LATENCY, execLatencyTotal / executed); double procLatencyTotal = getByKeyOr0(m, PROC_LAT_TOTAL).doubleValue(); m.put(PROC_LATENCY, procLatencyTotal / executed); } m.remove(EXEC_LAT_TOTAL); m.remove(PROC_LAT_TOTAL); String id = (String) e.getKey(); m.put("last-error", getLastError(clusterState, topologyId, id)); aggBolt2stats.put(id, thriftifyBoltAggStats(m)); } //...... return ret; } private static ErrorInfo getLastError(IStormClusterState stormClusterState, String stormId, String compId) { return stormClusterState.lastError(stormId, compId); }
  • 这里有添加last-error,通过getLastError调用,之后再通过thriftifyBoltAggStats转化到thrift对象
  • 这里调用了stormClusterState.lastError(stormId, compId)获取last-error

UIHelpers.getTopologySummary

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java

/**     * getTopologySummary.     * @param topologyPageInfo topologyPageInfo     * @param window window     * @param config config     * @param remoteUser remoteUser     * @return getTopologySummary     */    public static Map
getTopologySummary(TopologyPageInfo topologyPageInfo, String window, Map
config, String remoteUser) { Map
result = new HashMap(); Map
topologyConf = (Map
) JSONValue.parse(topologyPageInfo.get_topology_conf()); long messageTimeout = (long) topologyConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS); Map
unpackedTopologyPageInfo = unpackTopologyInfo(topologyPageInfo, window, config); result.putAll(unpackedTopologyPageInfo); result.put("user", remoteUser); result.put("window", window); result.put("windowHint", getWindowHint(window)); result.put("msgTimeout", messageTimeout); result.put("configuration", topologyConf); result.put("visualizationTable", new ArrayList()); result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE)); return result; }
  • 获取到TopologyPageInfo之后,UIHelpers.getTopologySummary对其进行unpackTopologyInfo

UIHelpers.unpackTopologyInfo

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java

/**     * unpackTopologyInfo.     * @param topologyPageInfo topologyPageInfo     * @param window window     * @param config config     * @return unpackTopologyInfo     */    private static Map
unpackTopologyInfo(TopologyPageInfo topologyPageInfo, String window, Map
config) { Map
result = new HashMap(); result.put("id", topologyPageInfo.get_id()); //...... Map
spouts = topologyPageInfo.get_id_to_spout_agg_stats(); List
spoutStats = new ArrayList(); for (Map.Entry
spoutEntry : spouts.entrySet()) { spoutStats.add(getTopologySpoutAggStatsMap(spoutEntry.getValue(), spoutEntry.getKey())); } result.put("spouts", spoutStats); Map
bolts = topologyPageInfo.get_id_to_bolt_agg_stats(); List
boltStats = new ArrayList(); for (Map.Entry
boltEntry : bolts.entrySet()) { boltStats.add(getTopologyBoltAggStatsMap(boltEntry.getValue(), boltEntry.getKey())); } result.put("bolts", boltStats); //...... result.put("samplingPct", samplingPct); result.put("replicationCount", topologyPageInfo.get_replication_count()); result.put("topologyVersion", topologyPageInfo.get_topology_version()); result.put("stormVersion", topologyPageInfo.get_storm_version()); return result; } /** * getTopologySpoutAggStatsMap. * @param componentAggregateStats componentAggregateStats * @param spoutId spoutId * @return getTopologySpoutAggStatsMap */ private static Map
getTopologySpoutAggStatsMap(ComponentAggregateStats componentAggregateStats, String spoutId) { Map
result = new HashMap(); CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); result.putAll(getCommonAggStatsMap(commonStats)); result.put("spoutId", spoutId); result.put("encodedSpoutId", URLEncoder.encode(spoutId)); SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout(); result.put("completeLatency", spoutAggregateStats.get_complete_latency_ms()); ErrorInfo lastError = componentAggregateStats.get_last_error(); result.put("lastError", Objects.isNull(lastError) ? "" : getTruncatedErrorString(lastError.get_error())); return result; } /** * getTopologyBoltAggStatsMap. * @param componentAggregateStats componentAggregateStats * @param boltId boltId * @return getTopologyBoltAggStatsMap */ private static Map
getTopologyBoltAggStatsMap(ComponentAggregateStats componentAggregateStats, String boltId) { Map
result = new HashMap(); CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); result.putAll(getCommonAggStatsMap(commonStats)); result.put("boltId", boltId); result.put("encodedBoltId", URLEncoder.encode(boltId)); BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt(); result.put("capacity", StatsUtil.floatStr(boltAggregateStats.get_capacity())); result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms())); result.put("executed", boltAggregateStats.get_executed()); result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms())); ErrorInfo lastError = componentAggregateStats.get_last_error(); result.put("lastError", Objects.isNull(lastError) ? "" : getTruncatedErrorString(lastError.get_error())); return result; } /** * getTruncatedErrorString. * @param errorString errorString * @return getTruncatedErrorString */ private static String getTruncatedErrorString(String errorString) { return errorString.substring(0, Math.min(errorString.length(), 200)); }
  • 注意这里对spout调用了getTopologySpoutAggStatsMap,对bolt调用了getTopologyBoltAggStatsMap
  • 这两个方法对lastError都进行了getTruncatedErrorString处理,最大只substring(0,200)

crash log

2018-10-23 02:53:28.118 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Async loop died!java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer    at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2]    at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?]    at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]    ... 6 more2018-10-23 02:53:28.129 o.a.s.d.executor Thread-10-print-executor[7 7] [ERROR]java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer    at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2]    at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?]    at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]    ... 6 more2018-10-23 02:53:28.175 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Halting process: ("Worker died")java.lang.RuntimeException: ("Worker died")    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.2.2.jar:1.2.2]    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]    at org.apache.storm.daemon.worker$fn__11404$fn__11405.invoke(worker.clj:792) [storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.daemon.executor$mk_executor_data$fn__10612$fn__10613.invoke(executor.clj:281) [storm-core-1.2.2.jar:1.2.2]    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:494) [storm-core-1.2.2.jar:1.2.2]    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]2018-10-23 02:53:28.176 o.a.s.d.worker Thread-41 [INFO] Shutting down worker reportErrorDemo-2-1540263136 f9856902-cfe9-45c7-b675-93a29d3d3d36 67002018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Terminating messaging context2018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Shutting down executors2018-10-23 02:53:28.177 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[8 8]2018-10-23 02:53:28.182 o.a.s.util Thread-3-disruptor-executor[8 8]-send-queue [INFO] Async loop interrupted!2018-10-23 02:53:28.186 o.a.s.util Thread-4-spout-executor[8 8] [INFO] Async loop interrupted!2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[8 8]2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[12 12]2018-10-23 02:53:28.189 o.a.s.util Thread-5-disruptor-executor[12 12]-send-queue [INFO] Async loop interrupted!2018-10-23 02:53:28.190 o.a.s.util Thread-6-spout-executor[12 12] [INFO] Async loop interrupted!2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[12 12]2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shutting down executor count:[2 2]2018-10-23 02:53:28.191 o.a.s.util Thread-7-disruptor-executor[2 2]-send-queue [INFO] Async loop interrupted!2018-10-23 02:53:28.193 o.a.s.util Thread-8-count-executor[2 2] [INFO] Async loop interrupted!2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shut down executor count:[2 2]2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shutting down executor print:[7 7]2018-10-23 02:53:28.196 o.a.s.util Thread-9-disruptor-executor[7 7]-send-queue [INFO] Async loop interrupted!

小结

  • spout或bolt的方法里头如果抛出异常会导致整个worker die掉,同时也会自动记录异常到zk但是代价就是worker die掉不断被重启
  • reportError可以通过try catch结合使用,使得有异常之后,worker不会die掉,同时也把error信息记录起来;不过一个topology的同一个component也只记录最近10个异常,采用的是EPHEMERAL_SEQUENTIAL节点来保存,随着worker的die而销毁;lastError采用的是PERSISTENT节点。两者在topology被kill的时候相关信息都会被删掉。
  • storm-ui展示了每个component的lastError信息,展示的时候错误信息的长度最大为200

doc

转载于:https://my.oschina.net/go4it/blog/2251322

你可能感兴趣的文章
基层公务员自述:每天擦桌子证明自己还活着(全文)
查看>>
电话营销六种经典开场白
查看>>
wxPython图像相关处理
查看>>
jdbc链接oracle数据库
查看>>
重新配置Synology磁盘模式到RAID5
查看>>
ORA-00845: MEMORY_TARGET not supported on this system
查看>>
完美解决failed to open stream: HTTP request failed!(file_get_contents引起的)
查看>>
安装包大全
查看>>
Mysql 通过全量备份和binlog恢复整体数据
查看>>
使用paramiko模块在远程服务器执行命令
查看>>
Cannot change version of project facet Dynamic web
查看>>
Nginx中文手册
查看>>
jqgrid saveRow 保存行 编辑数据向后台保存的使用
查看>>
离职后的选择
查看>>
Java编写QQ邮件发送程序
查看>>
JavaWeb
查看>>
我的友情链接
查看>>
Struts2输入<无提示解决方法
查看>>
字符串资源多语言的出错问题
查看>>
集合框架
查看>>