源码阅读准备
下载Apache Hadoop-2.9.2官方源码
将源码导入idea中
等待下载和解决依赖完成,源码导入成功!
NameNode启动流程
启动Hdfs集群命令start-dfs.sh 。
该命令会启动Hdfs的NameNode以及DataNode,启动NameNode主要是通过org.apache.hadoop.hdfs.server.namenode.NameNode类。
我们重点关注NameNode在启动过程中做了哪些工作(偏离主线的技术细节不深究)。
对于分析启动流程主要关注两部分代码静态代码块和main方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #createNameNode方法 public static NameNode createNameNode(String argv[], Configuration conf) throws IOException { LOG.info("createNameNode " + Arrays.asList(argv)); if (conf == null) conf = new HdfsConfiguration(); // Parse out some generic args into Configuration. GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); argv = hParser.getRemainingArgs(); // Parse the rest, NN specific args. //解析启动的参数 StartupOption startOpt = parseArguments(argv); if (startOpt == null) { printUsage(System.err); return null; } setStartupOption(conf, startOpt); switch (startOpt) { .... default: { //正常启动进入该分支 //初始化metric系统 DefaultMetricsSystem.initialize("NameNode"); //返回新的NameNode return new NameNode(conf); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 //namenode的有参构造 public NameNode(Configuration conf) throws IOException { this(conf, NamenodeRole.NAMENODE); } protected NameNode(Configuration conf, NamenodeRole role) throws IOException { super(conf); this.tracer = new Tracer.Builder("NameNode"). conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)). build(); this.tracerConfigurationManager = new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf); this.role = role; // 设置NameNode#clientNamenodeAddress为"hdfs://localhost:9000" setClientNamenodeAddress(conf); String nsId = getNameServiceId(conf); // HA相关 String namenodeId = HAUtil.getNameNodeId(conf, nsId); this.haEnabled = HAUtil.isHAEnabled(conf, nsId); state = createHAState(getStartupOption(conf)); this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf); this.haContext = createHAContext(); try { initializeGenericKeys(conf, nsId, namenodeId); //初始化namenode的核心方法 ,q其余代码是关于HA集群 initialize(getConf()); try { haContext.writeLock(); state.prepareToEnterState(haContext); state.enterState(haContext); } finally { haContext.writeUnlock(); } } catch (IOException e) { this.stopAtException(e); throw e; } catch (HadoopIllegalArgumentException e) { this.stopAtException(e); throw e; } this.started.set(true); } //尽管本地没有开启HA(haEnabled=false**),**namenode依然拥有一个HAState,namenode 的HAState状态为active.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 //初始化namenode protected void initialize(Configuration conf) throws IOException { if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) { String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY); if (intervals != null) { conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS, intervals); } } UserGroupInformation.setConfiguration(conf); loginAsNameNodeUser(conf); //初始化namemode的度量系统 NameNode.initMetrics(conf, this.getRole()); StartupProgressMetrics.register(startupProgress); //初始化jvm监听的度量系统 pauseMonitor = new JvmPauseMonitor(); pauseMonitor.init(conf); pauseMonitor.start(); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); // 启动httpServer if (NamenodeRole.NAMENODE == role) { startHttpServer(conf); //启动httpserver } loadNamesystem(conf); //加载磁盘上的元数据信息 //创建rpcserver,支持namenode与datanode,client进行通信的协议 rpcServer = createRpcServer(conf); initReconfigurableBackoffKey(); if (clientNamenodeAddress == null) { // This is expected for MiniDFSCluster. Set it now using // the RPC server's bind address. clientNamenodeAddress = NetUtils.getHostPortString(getNameNodeAddress()); LOG.info("Clients are to use " + clientNamenodeAddress + " to access" + " this namenode/service."); } if (NamenodeRole.NAMENODE == role) { httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } //这个方法非常重要,启动了非常多的重要的工作线程 startCommonServices(conf); startMetricsLogger(conf); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void startCommonServices(Configuration conf) throws IOException { //创建namenode资源检查的线程,块管理器(blockManager) namesystem.startCommonServices(conf, haContext); registerNNSMXBean(); // 角色非`NamenodeRole.NAMENODE`的在此处启动HttpServer if (NamenodeRole.NAMENODE != role) { startHttpServer(conf); httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } rpcServer.start(); //启动rpcserver try { plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY, ServicePlugin.class); } catch (RuntimeException e) { String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY); LOG.error("Unable to load NameNode plugins. Specified list of plugins: " + pluginsValue, e); throw e; } // 启动各插件 for (ServicePlugin p: plugins) { try { p.start(this); } catch (Throwable t) { LOG.warn("ServicePlugin " + p + " could not be started", t); } } LOG.info(getRole() + " RPC up at: " + getNameNodeAddress()); if (rpcServer.getServiceRpcAddress() != null) { LOG.info(getRole() + " service RPC up at: " + rpcServer.getServiceRpcAddress()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 void startCommonServices(Configuration conf, HAContext haContext) throws IOException { this.registerMBean(); // register the MBean for the FSNamesystemState writeLock(); this.haContext = haContext; try { //创建NameNodeResourceChecker,并立即检查一次 nnResourceChecker = new NameNodeResourceChecker(conf); checkAvailableResources(); assert !blockManager.isPopulatingReplQueues(); // 设置一些启动过程中的信息 StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.SAFEMODE); long completeBlocksTotal = getCompleteBlocksTotal(); // 设置已完成的数据块总量 prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, completeBlocksTotal); // 激活BlockManager blockManager.activate(conf, completeBlocksTotal); } finally { writeUnlock("startCommonServices"); } registerMXBean(); DefaultMetricsSystem.instance().register(this); if (inodeAttributeProvider != null) { inodeAttributeProvider.start(); dir.setINodeAttributeProvider(inodeAttributeProvider); } snapshotManager.registerMXBean(); InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true); this.nameNodeHostName = (serviceAddress != null) ? serviceAddress.getHostName() : ""; } //blockManager.activate(conf)激活BlockManager主要完成PendingReplicationMonitor、 DecommissionManager#Monitor、HeartbeatManager#Monitor、ReplicationMonitor
1 2 3 4 5 6 7 8 9 10 11 12 13 public void activate(Configuration conf, long blockTotal) { // 启动PendingReplicationMonitor pendingReplications.start(); // 激活DatanodeManager:启动DecommissionManager--Monitor、HeartbeatManager-- Monitor datanodeManager.activate(conf); this.replicationThread.setName("ReplicationMonitor"); // 启动BlockManager--ReplicationMonitor this.replicationThread.start(); //块汇报线程穹顶(心跳检测机制) this.blockReportThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); }
namenode的主要责任是文件元信息与数据块映射的管理。相应的,namenode的启动流程需要关注与客户端、datanode通信的工作线程,文件元信息的管理机制,数据块的管理机制等。其中,RpcServer主要负责与客户端、datanode通信,FSDirectory主要负责管理文件元信息。
DataNode启动流程
datanode的Main Class是DataNode,先找到DataNode.main()
1 2 3 4 5 6 7 8 9 10 11 12 public class DataNode extends ReconfigurableBase implements InterDatanodeProtocol, ClientDatanodeProtocol, TraceAdminProtocol,DataNodeMXBean,ReconfigurationProtocol { public static final Logger LOG = LoggerFactory.getLogger(DataNode.class); static{ HdfsConfiguration.init(); } ... public static void main(String args[]) { if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) { System.exit(0); } secureMain(args, null); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void secureMain(String args[], SecureResources resources) { int errorCode = 0; try { //打印启动信息 StringUtils.startupShutdownMessage(DataNode.class, args, LOG); // 完成创建datanode的主要工作 DataNode datanode = createDataNode(args, null, resources); if (datanode != null) { datanode.join(); } else { errorCode = 1; } } catch (Throwable e) { LOG.error("Exception in secureMain", e); terminate(1, e); } finally { // We need to terminate the process here because either shutdown was called // or some disk related conditions like volumes tolerated or volumes required // condition was not met. Also, In secure mode, control will go to Jsvc // and Datanode process hangs if it does not exit. LOG.warn("Exiting Datanode"); terminate(errorCode); } }
1 2 3 4 5 6 7 8 9 10 11 12 @VisibleForTesting @InterfaceAudience.Private public static DataNode createDataNode(String args[], Configuration conf, SecureResources resources) throws IOException { //初始化datanode对象,启动了很多工作的线程 DataNode dn = instantiateDataNode(args, conf, resources); if (dn != null) { // 启动剩余工作线程 dn.runDatanodeDaemon(); } return dn; }
1 2 3 4 5 6 7 8 9 10 11 12 13 public void runDatanodeDaemon() throws IOException { // 在DataNode.instantiateDataNode()执行过程中会调用该方法(见后) blockPoolManager.startAll(); // start dataXceiveServer dataXceiverServer.start(); if (localDataXceiverServer != null) { localDataXceiverServer.start(); } ipcServer.setTracer(tracer); ipcServer.start(); startPlugins(getConf()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static DataNode instantiateDataNode(String args [], Configuration conf, SecureResources resources) throws IOException { if (conf == null) conf = new HdfsConfiguration(); // 参数检查等 if (args != null) { // parse generic hadoop options GenericOptionsParser hParser = new GenericOptionsParser(conf, args); args = hParser.getRemainingArgs(); } if (!parseArguments(args, conf)) { printUsage(System.err); return null; } //获取dn存储数据的磁盘路径 Collection<StorageLocation> dataLocations = getStorageLocations(conf); UserGroupInformation.setConfiguration(conf); SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY, DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, getHostName(conf)); return makeInstance(dataLocations, conf, resources); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 //DataNode.makeInstance()开始创建DataNode static DataNode makeInstance(Collection<StorageLocation> dataDirs, Configuration conf, SecureResources resources) throws IOException { List<StorageLocation> locations; StorageLocationChecker storageLocationChecker = new StorageLocationChecker(conf, new Timer()); try { // 检查数据目录的权限 locations = storageLocationChecker.check(conf, dataDirs); } catch (InterruptedException ie) { throw new IOException("Failed to instantiate DataNode", ie); } //初始化有关datanode的度量系统 DefaultMetricsSystem.initialize("DataNode"); assert locations.size() > 0 : "number of data directories should be > 0"; return new DataNode(conf, locations, storageLocationChecker, resources); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 DataNode(final Configuration conf, final List<StorageLocation> dataDirs,final SecureResources resources) throws IOException { super(conf); ...// 参数设置 try { hostName = getHostName(conf); LOG.info("Configured hostname is " + hostName); //启动datanode startDataNode(conf, dataDirs, resources); } catch (IOException ie) { shutdown(); throw ie; } ... }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void startDataNode(Configuration conf, List<StorageLocation> dataDirs, SecureResources resources ) throws IOException { ...// 参数设置 // 初始化DataStorage storage = new DataStorage(); // global DN settings // 注册JMX registerMXBean(); // 初始化DataXceiver(流式通信),DataNode runDatanodeDaemon()中启动 initDataXceiver(conf); // 启动InfoServer(Web UI) startInfoServer(conf); // 启动JVMPauseMonitor(反向监控JVM情况,可通过JMX查询) pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); ...// 略 // 初始化IpcServer(RPC通信),DataNode-runDatanodeDaemon()中启动 initIpcServer(conf); metrics = DataNodeMetrics.create(conf, getDisplayName()); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); // 按照namespace(nameservice)、namenode的结构进行初始化 blockPoolManager = new BlockPoolManager(this); blockPoolManager.refreshNamenodes(conf); ...// 略 }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 //BlockPoolManager抽象了datanode提供的数据块存储服务。 //BlockPoolManager按照 namespace(nameservice)、namenode结构组织。 //BlockPoolManager-refreshNamenodes() //除了初始化过程主动调用,还可以由namespace通过datanode心跳过程下达刷新命令 void refreshNamenodes(Configuration conf) throws IOException { LOG.info("Refresh request received for nameservices: " + conf.get (DFSConfigKeys.DFS_NAMESERVICES)); Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil .getNNServiceRpcAddressesForCluster(conf); Map<String, Map<String, InetSocketAddress>> newLifelineAddressMap = DFSUtil .getNNLifelineRpcAddressesForCluster(conf); synchronized (refreshNamenodesLock) { doRefreshNamenodes(newAddressMap, newLifelineAddressMap); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private void doRefreshNamenodes(Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException { assert Thread.holdsLock(refreshNamenodesLock); Set<String> toRefresh = Sets.newLinkedHashSet(); Set<String> toAdd = Sets.newLinkedHashSet(); Set<String> toRemove; synchronized (this) { // Step 1. For each of the new nameservices, figure out whether // it's an update of the set of NNs for an existing NS, // or an entirely new nameservice. for (String nameserviceId : addrMap.keySet()) { if (bpByNameserviceId.containsKey(nameserviceId)) { toRefresh.add(nameserviceId); } else { toAdd.add(nameserviceId); } } ...// 略 // Step 2. Start new nameservices if (!toAdd.isEmpty()) { LOG.info("Starting BPOfferServices for nameservices: " + Joiner.on(",").useForNull("<default>").join(toAdd)); for (String nsToAdd : toAdd) { ArrayList<InetSocketAddress> addrs = Lists.newArrayList(addrMap.get(nsToAdd).values()); // 为每个namespace创建对应的BPOfferService BPOfferService bpos = createBPOS(addrs); bpByNameserviceId.put(nsToAdd, bpos); offerServices.add(bpos); } } // 然后通过startAll启动所有 BPOfferService startAll(); } ...// 略 }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected BPOfferService createBPOS( final String nameserviceId, List<InetSocketAddress> nnAddrs, List<InetSocketAddress> lifelineNnAddrs) { return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn); } BPOfferService( final String nameserviceId, List<InetSocketAddress> nnAddrs, List<InetSocketAddress> lifelineNnAddrs, DataNode dn) { Preconditions.checkArgument(!nnAddrs.isEmpty(), "Must pass at least one NN."); Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(), "Must pass same number of NN addresses and lifeline addresses."); this.nameserviceId = nameserviceId; this.dn = dn; for (int i = 0; i < nnAddrs.size(); ++i) { this.bpServices.add(new BPServiceActor(nnAddrs.get(i), lifelineNnAddrs.get(i), this)); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 //BlockPoolManager#startAll()启动所有BPOfferService(实际是启动所有 BPServiceActor)。 synchronized void startAll() throws IOException { try { UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { for (BPOfferService bpos : offerServices) { bpos.start(); } return null; } }); } catch (InterruptedException ex) { IOException ioe = new IOException(); ioe.initCause(ex.getCause()); throw ioe; } }
1 2 3 4 5 6 //This must be called only by blockPoolManager void start() { for (BPServiceActor actor : bpServices) { actor.start(); } }
在datanode启动的主流程中,启动了多种工作线程,包括InfoServer、JVMPauseMonitor、 BPServiceActor等。其中,最重要的是BPServiceActor线程,真正代表datanode与namenode通信的 正是BPServiceActor线程。
NameNode如何支撑高并发访问(双缓冲机制)
高并发访问NameNode会遇到什么样的问题?
经过学习HDFS的元数据管理机制,Client每次请求NameNode修改一条元数据(比如说申请上传一个文件,都要写一条edits log,包括两个步骤:写入本地磁盘–edits文件,通过网络传输给JournalNodes集群(Hadoop HA集群–结合zookeeper来学习)。
高并发的难点主要在于数据的多线程安全以及每个操作效率!
对于多线程安全
NameNode在写edits log时几个原则:写入数据到edits_log必须保证每条edits都有一个全局顺序递增的transactionId(简称为txid), 这样才可以标识出来一条一条的edits的先后顺序。如果要保证每条edits的txid都是递增的,就必须得加同步锁。也就是每个线程修改了元数据,要写 一条edits 的时候,都必须按顺序排队获取锁后,才能生成一个递增的txid,代表这次要写的edits的序号。
产生的问题
如果每次都是在一个加锁的代码块里,生成txid,然后写磁盘文件edits log,这种既有同步锁又有写磁盘操作非常耗时!
HDFS优化解决方案
串行化:使用分段锁
首先各个线程依次第一次获取锁,生成顺序递增的txid,然后将edits写入内存双缓冲的区域1,接着就立马第一次释放锁了。趁着这个空隙,后面的线程就可以再次立马第一次获取锁,然后立即写自己的edits到内存缓冲。
写磁盘:使用双缓冲
程序中将会开辟两份一模一样的内存空间,一个为bufCurrent,产生的数据会直接写入到这个bufCurrent,而另一个叫bufReady,在bufCurrent数据写入(达到一定标准)后,两片内存就会exchange(交换)。直接交换双缓冲的区域1和区域2。保证接收客户端写入数据请求的都是操作内存而不是同步写磁盘。
双缓冲源码分析,找到FsEditLog.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void logEdit(final FSEditLogOp op) { boolean needsSync = false;//是否同步的标识 synchronized (this) { // assert isOpenForWrite() : "bad state: " + state; // wait if an automatic sync is scheduled 如果当前操作被其它线程调度,则等待1s钟 waitIfAutoSyncScheduled(); // check if it is time to schedule an automatic sync needsSync = doEditTransaction(op); //默认值是false,而内存需要进行交换的标准:1 写入数据内存满,2 达到默认的时间周期 if (needsSync) { //假设 为true,开始内存交换 isAutoSyncScheduled = true;//标识bufCurrent满了,进行双缓冲刷写 } } // Sync the log if an automatic sync is required. if (needsSync) { logSync();//将缓冲区数据刷写到磁盘 } }