对namenode启动时的相关操作及相关类有一个大体了解,后续深入研究时,再对本文进行补充
>实现类
HDFS 启动脚本为 $HADOOP_HOME/sbin/start-dfs.sh ,查看 start-dfs.sh 可以看出, namenode 是通过 bin /hdfs 命令来启动
$
vi
start-dfs.
sh
# namenodes
NAMENODES
=$($HADOOP_PREFIX/bin/hdfs getconf -
namenodes)
echo
"
Starting namenodes on [$NAMENODES]
"
"
$HADOOP_PREFIX/sbin/hadoop-daemons.sh
"
\
--config
"
$HADOOP_CONF_DIR
"
\
--hostnames
"
$NAMENODES
"
\
--script
"
$bin/hdfs
"
start namenode $nameStartOpt
#
---------------------------------------------------------
查看 $HADOOP_HOME/bin/hdfs ,可以找到 namenode 启动所调用的 java 类。
$
vi
bin/
hdfs:
if
[
"
$COMMAND
"
=
"
namenode
"
] ;
then
CLASS
=
'
org.apache.hadoop.hdfs.server.namenode.NameNode
'
HADOOP_OPTS
=
"
$HADOOP_OPTS $HADOOP_NAMENODE_OPTS
"
>源码查看
按照前文 hadoop2.5.2学习及实践笔记(二)—— 编译源代码及导入源码至 eclipse 步骤,源码已经导入到 eclipse 中,快捷键 ctrl +shift+R 搜索并打开 NameNode .java 查看源码
NameNode 类中有一个静态代码块,表示在加载器加载 NameNode 类过程中的准备阶段,就会执行代码块中的代码。 HdfsConfiguration 的 init () 方法的方法体是空的,这里的作用是触发对 HdfsConfiguration 的主动调用,从而保证在执行 NameNode 类相关调用时,如果 HdfsConfiguration 类没有被加载和初始化,先触发 HdfsConfiguration 的初始化过程。
//
org.apache.hadoop.hdfs.server.namenode.NameNode.java
static
{
//
HdfsConfiguration类init()方法:public static void init() {}
HdfsConfiguration.init();
}
查看其 main 方法,可以看出 namenode 相关操作的主要入口方法是 createNameNode ( String argv[], Configuration conf ) 方法。
//
org.apache.hadoop.hdfs.server.namenode.NameNode.java
public
static
void
main(String argv[])
throws
Exception {
if
(DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out,
true
)) {
System.exit(
0
);
}
try
{
//
打印namenode启动或关闭日志信息
StringUtils.startupShutdownMessage(NameNode.
class
, argv, LOG);
//
namenode相关主要操作
NameNode namenode =
createNameNode
(argv,
null
);
if
(namenode !=
null
) {
//
向客户端和datanode提供RPC服务,直到RPC服务器结束运行
namenode.join();
}
}
catch
(Throwable e) {
LOG.fatal(
"Exception in namenode join"
, e);
terminate(
1
, e);
}
}
createNameNode 方法中通过一个 switch 语句对不同的命令执行不同的操作。比如搭建环境时格式化文件系统时的操作,可以查看FORMAT分支。
//
org.apache.hadoop.hdfs.server.namenode.NameNode.java
public
static
NameNode
createNameNode
(String argv[], Configuration conf)
throws
IOException {
LOG.info(
"createNameNode " +
Arrays.asList(argv));
if
(conf ==
null
)
conf
=
new
HdfsConfiguration();
//
参数为空时默认: -regular
StartupOption startOpt =
parseArguments(argv);
if
(startOpt ==
null
) {
printUsage(System.err);
return
null
;
}
setStartupOption(conf, startOpt);
switch
(startOpt) {
case
FORMAT: {
//
格式化文件系统,伪分布式环境搭建时调用过namenode -format命令
boolean
aborted =
format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted
? 1 : 0
);
return
null
;
//
avoid javac warning
}
case
GENCLUSTERID: {
System.err.println(
"Generating new cluster id:"
);
System.out.println(NNStorage.newClusterID());
terminate(
0
);
return
null
;
}
case
FINALIZE: {
System.err.println(
"Use of the argument '" + StartupOption.FINALIZE +
"' is no longer supported. To finalize an upgrade, start the NN " +
" and then run `hdfs dfsadmin -finalizeUpgrade'"
);
terminate(
1
);
return
null
;
//
avoid javac warning
}
case
ROLLBACK: {
boolean
aborted = doRollback(conf,
true
);
terminate(aborted
? 1 : 0
);
return
null
;
//
avoid warning
}
case
BOOTSTRAPSTANDBY: {
String toolArgs[]
= Arrays.copyOfRange(argv, 1
, argv.length);
int
rc =
BootstrapStandby.run(toolArgs, conf);
terminate(rc);
return
null
;
//
avoid warning
}
case
INITIALIZESHAREDEDITS: {
boolean
aborted =
initializeSharedEdits(conf,
startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted
? 1 : 0
);
return
null
;
//
avoid warning
}
case
BACKUP:
case
CHECKPOINT: {
//
backupnode和checkpointnode启动
NamenodeRole role =
startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(
" ", ""
));
//
backupnode继承NameNode类,代码最终执行的还是NameNode的构造方法
return
new
BackupNode(conf, role);
}
case
RECOVER: {
NameNode.doRecovery(startOpt, conf);
return
null
;
}
case
METADATAVERSION: {
printMetadataVersion(conf);
terminate(
0
);
return
null
;
//
avoid javac warning
}
default
: {
DefaultMetricsSystem.initialize(
"NameNode"
);
//
启动时startOpt=“-regular”,代码执行default分支,通过构造函数返回一个namenode实例
return
new
NameNode(conf);
}
}
}
namenode 的构造方法
//
org.apache.hadoop.hdfs.server.namenode.NameNode.java
public
NameNode(Configuration conf)
throws
IOException {
this
(conf, NamenodeRole.NAMENODE);
}
protected
NameNode(Configuration conf, NamenodeRole role)
throws
IOException {
this
.conf =
conf;
this
.role =
role;
//
获取fs.defaultFS,设置namenode地址
setClientNamenodeAddress(conf);
String nsId
=
getNameServiceId(conf);
String namenodeId
=
HAUtil.getNameNodeId(conf, nsId);
//
是否启用HA
this
.haEnabled =
HAUtil.isHAEnabled(conf, nsId);
//
HA状态:启用/备用
state =
createHAState(getStartupOption(conf));
//
读取dfs.ha.allow.stale.reads,设置namenode在备用状态时是否允许读操作,默认为false
this
.allowStaleStandbyReads =
HAUtil.shouldAllowStandbyReads(conf);
this
.haContext =
createHAContext();
try
{
//
联邦环境下,使用该方法配置一系列使用一个逻辑上的nsId组合在一起的namenode
initializeGenericKeys(conf, nsId, namenodeId);
//
namenode初始化
initialize(conf);
try
{
haContext.writeLock();
state.prepareToEnterState(haContext);
//
namenode进入相应状态:active state/backup state/standby state
state.enterState(haContext);
}
finally
{
haContext.writeUnlock();
}
}
catch
(IOException e) {
this
.stop();
throw
e;
}
catch
(HadoopIllegalArgumentException e) {
this
.stop();
throw
e;
}
}
namenode初始化方法代码
//
org.apache.hadoop.hdfs.server.namenode.NameNode.java
protected
void
initialize
(Configuration conf)
throws
IOException {
if
(conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) ==
null
) {
String intervals
=
conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if
(intervals !=
null
) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}
//
设置权限,根据hadoop.security.authentication获取认证方式及规则
UserGroupInformation.setConfiguration(conf);
//
登录:如果认证方式为simple则退出该方法
//
否则调用UserGroupInformation.loginUserFromKeytab进行登陆,登陆使用dfs.namenode.kerberos.principal作为用户名
loginAsNameNodeUser(conf);
//
初始化度量系统,用于度量namenode服务状态
NameNode.initMetrics(conf,
this
.getRole());
StartupProgressMetrics.register(startupProgress);
if
(NamenodeRole.NAMENODE ==
role) {
//
启动http服务器
startHttpServer(conf);
}
//根据命令对命名空间进行操作,如:前文所述启动时
加载本地命名空间镜像和应用编辑日志,在内存中建立命名空间的映像
loadNamesystem(conf);
//
创建RPC服务器
rpcServer =
createRpcServer(conf);
if
(clientNamenodeAddress ==
null
) {
//
This is expected for MiniDFSCluster. Set it now using
//
the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(rpcServer.getRpcAddress());
LOG.info(
"Clients are to use " + clientNamenodeAddress + " to access"
+ " this namenode/service."
);
}
if
(NamenodeRole.NAMENODE ==
role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
pauseMonitor
=
new
JvmPauseMonitor(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
//
启动活动状态和备用状态的公共服务:RPC服务和namenode的插件程序启动
startCommonServices(conf);
}
loadNamesystem(Configuration conf) 方法调用 FSNamesystem 类的 loadFromDisk(Configuration conf) 。前文提到的, namenode 启动时从本地文件系统加载镜像并重做编辑日志,都在此方法中实现。
//
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.java
static
FSNamesystem
loadFromDisk
(Configuration conf)
throws
IOException {
//
必须的编辑日志目录检查
checkConfiguration(conf);
//
设在NNStorage,并初始化编辑日志目录。NNStorage主要功能是管理namenode使用的存储目录
FSImage fsImage =
new
FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
//
根据指定的镜像创建FSNamesystem对象
FSNamesystem namesystem =
new
FSNamesystem(conf, fsImage,
false
);
StartupOption startOpt
=
NameNode.getStartupOption(conf);
if
(startOpt ==
StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
}
long
loadStart =
now();
try
{
//
加载镜像、重做编辑日志,并打开一个新编辑文件都在此方法中
namesystem.loadFSImage(startOpt);
}
catch
(IOException ioe) {
LOG.warn(
"Encountered exception loading fsimage"
, ioe);
fsImage.close();
throw
ioe;
}
long
timeTakenToLoadFSImage = now() -
loadStart;
LOG.info(
"Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs"
);
NameNodeMetrics nnMetrics
=
NameNode.getNameNodeMetrics();
if
(nnMetrics !=
null
) {
nnMetrics.setFsImageLoadTime((
int
) timeTakenToLoadFSImage);
}
return
namesystem;
}
private
void
loadFSImage
(StartupOption startOpt)
throws
IOException {
final
FSImage fsImage =
getFSImage();
//
format before starting up if requested
if
(startOpt ==
StartupOption.FORMAT) {
fsImage.format(
this
, fsImage.getStorage().determineClusterId());
//
reuse current id
startOpt
=
StartupOption.REGULAR;
}
boolean
success =
false
;
writeLock();
try
{
//
We shouldn't be calling saveNamespace if we've come up in standby state.
MetaRecoveryContext recovery =
startOpt.createRecoveryContext();
final boolean staleImage = fsImage.recoverTransitionRead(startOpt, this, recovery);
if
(RollingUpgradeStartupOption.ROLLBACK.matches(startOpt)) {
rollingUpgradeInfo
=
null
;
}
final
boolean
needToSave = staleImage && !haEnabled && !
isRollingUpgrade();
LOG.info(
"Need to save fs image? " +
needToSave
+ " (staleImage=" + staleImage + ", haEnabled=" +
haEnabled
+ ", isRollingUpgrade=" + isRollingUpgrade() + ")"
);
if
(needToSave) {
fsImage.saveNamespace(
this
);
}
else
{
//
No need to save, so mark the phase done.
StartupProgress prog =
NameNode.getStartupProgress();
prog.beginPhase(Phase.SAVING_CHECKPOINT);
prog.endPhase(Phase.SAVING_CHECKPOINT);
}
//
This will start a new log segment and write to the seen_txid file, so
//
we shouldn't do it when coming up in standby state
if
(!haEnabled || (haEnabled && startOpt ==
StartupOption.UPGRADE)) {
fsImage.openEditLogForWrite();
}
success
=
true
;
}
finally
{
if
(!
success) {
fsImage.close();
}
writeUnlock();
}
imageLoadComplete();
}

