| Method from org.apache.hadoop.dfs.DataNode Detail: |
static DataNode createDataNode(String[] args,
Configuration conf) throws IOException {
if (conf == null)
conf = new Configuration();
if (!parseArguments(args, conf)) {
printUsage();
return null;
}
return run(conf);
}
Start a single datanode daemon and wait for it to finish.
If this thread is specifically interrupted, it will stop waiting. |
public static InetSocketAddress createSocketAddr(String target) throws IOException {
int colonIndex = target.indexOf(':");
if (colonIndex < 0) {
throw new RuntimeException("Not a host:port pair: " + target);
}
String hostname;
int port;
if (!target.contains("/")) {
// must be the old style < host >:< port >
hostname = target.substring(0, colonIndex);
port = Integer.parseInt(target.substring(colonIndex + 1));
} else {
// a new uri
URI addr = new Path(target).toUri();
hostname = addr.getHost();
port = addr.getPort();
}
return new InetSocketAddress(hostname, port);
}
Util method to build socket addr from either:
:
://:/ |
public static DataNode getDataNode() {
return datanodeObject;
}
Return the DataNode object |
public InetSocketAddress getNameNodeAddr() {
return nameNodeAddr;
}
|
public String getNamenode() {
//return namenode.toString();
return "< namenode >";
}
Return the namenode's identifier |
static StartupOption getStartupOption(Configuration conf) {
return StartupOption.valueOf(conf.get("dfs.datanode.startup",
StartupOption.REGULAR.toString()));
}
|
void join() {
if (dataNodeThread != null) {
try {
dataNodeThread.join();
} catch (InterruptedException e) {}
}
}
|
public static void main(String[] args) {
try {
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
DataNode datanode = createDataNode(args, null);
if (datanode != null)
datanode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
}
}
|
static DataNode makeInstance(String[] dataDirs,
Configuration conf) throws IOException {
ArrayList< File > dirs = new ArrayList< File >();
for (int i = 0; i < dataDirs.length; i++) {
File data = new File(dataDirs[i]);
try {
DiskChecker.checkDir(data);
dirs.add(data);
} catch(DiskErrorException e) {
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
}
}
if (dirs.size() > 0)
return new DataNode(conf, dirs);
LOG.error("All directories in dfs.data.dir are invalid.");
return null;
}
Make an instance of DataNode after ensuring that at least one of the
given data directories (and their parent directories, if necessary)
can be created. |
public void offerService() throws Exception {
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
//
// Now loop for a long time....
//
while (shouldRun) {
try {
long now = System.currentTimeMillis();
//
// Every so often, send heartbeat or block-report
//
if (now - lastHeartbeat > heartBeatInterval) {
//
// All heartbeat messages include following info:
// -- Datanode name
// -- data transfer port
// -- Total capacity
// -- Bytes remaining
//
DatanodeCommand cmd = namenode.sendHeartbeat(dnRegistration,
data.getCapacity(),
data.getRemaining(),
xmitsInProgress,
xceiverCount.getValue());
//LOG.info("Just sent heartbeat, with name " + localName);
lastHeartbeat = now;
if (!processCommand(cmd))
continue;
}
// check if there are newly received blocks
Block [] blockArray=null;
synchronized(receivedBlockList) {
if (receivedBlockList.size() > 0) {
//
// Send newly-received blockids to namenode
//
blockArray = receivedBlockList.toArray(new Block[receivedBlockList.size()]);
}
}
if (blockArray != null) {
namenode.blockReceived(dnRegistration, blockArray);
synchronized (receivedBlockList) {
for(Block b: blockArray) {
receivedBlockList.remove(b);
}
}
}
// send block report
if (now - lastBlockReport > blockReportInterval) {
//
// Send latest blockinfo report if timer has expired.
// Get back a list of local block(s) that are obsolete
// and can be safely GC'ed.
//
DatanodeCommand cmd = namenode.blockReport(dnRegistration,
data.getBlockReport());
//
// If we have sent the first block report, then wait a random
// time before we start the periodic block reports.
//
if (lastBlockReport == 0) {
lastBlockReport = now - new Random().nextInt((int)(blockReportInterval));
} else {
lastBlockReport = now;
}
processCommand(cmd);
}
//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//
long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
synchronized(receivedBlockList) {
if (waitTime > 0 && receivedBlockList.size() == 0) {
try {
receivedBlockList.wait(waitTime);
} catch (InterruptedException ie) {
}
}
} // synchronized
} catch(RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass)) {
LOG.warn("DataNode is shutting down: " +
StringUtils.stringifyException(re));
shutdown();
return;
}
LOG.warn(StringUtils.stringifyException(re));
} catch (IOException e) {
LOG.warn(StringUtils.stringifyException(e));
}
} // while (shouldRun)
}
Main loop for the DataNode. Runs until shutdown,
forever calling remote NameNode functions. |
public void run() {
LOG.info("In DataNode.run, data = " + data);
// start dataXceiveServer
dataXceiveServer.start();
while (shouldRun) {
try {
startDistributedUpgradeIfNeeded();
offerService();
} catch (Exception ex) {
LOG.error("Exception: " + StringUtils.stringifyException(ex));
if (shouldRun) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
}
// wait for dataXceiveServer to terminate
try {
this.dataXceiveServer.join();
} catch (InterruptedException ie) {
}
LOG.info("Finishing DataNode in: "+data);
}
No matter what kind of exception we get, keep retrying to offerService().
That's the loop that connects to the NameNode and provides basic DataNode
functionality.
Only stop when "shouldRun" is turned off (which can only happen at shutdown). |
public static DataNode run(Configuration conf) throws IOException {
String[] dataDirs = conf.getStrings("dfs.data.dir");
DataNode dn = makeInstance(dataDirs, conf);
if (dn != null) {
dataNodeThread = new Thread(dn, "DataNode: [" +
StringUtils.arrayToString(dataDirs) + "]");
dataNodeThread.setDaemon(true); // needed for JUnit testing
dataNodeThread.start();
}
return dn;
}
|
long sendBlock(Socket sock,
Block block,
long startOffset,
long length,
DatanodeInfo[] targets) throws IOException {
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(sock.getOutputStream(),
BUFFER_SIZE));
RandomAccessFile blockInFile = null;
DataInputStream blockIn = null;
DataInputStream checksumIn = null;
long totalRead = 0;
/* XXX This will affect inter datanode transfers during
* a CRC upgrade. There should not be any replication
* during crc upgrade since we are in safe mode, right?
*/
boolean corruptChecksumOk = targets == null;
try {
File blockFile = data.getBlockFile( block );
blockInFile = new RandomAccessFile(blockFile, "r");
File checksumFile = FSDataset.getMetaFile( blockFile );
DataChecksum checksum = null;
if ( !corruptChecksumOk || checksumFile.exists() ) {
checksumIn = new DataInputStream(
new BufferedInputStream(new FileInputStream(checksumFile),
BUFFER_SIZE));
//read and handle the common header here. For now just a version
short version = checksumIn.readShort();
if ( version != FSDataset.METADATA_VERSION ) {
LOG.warn( "Wrong version (" + version +
") for metadata file for " + block + " ignoring ..." );
}
checksum = DataChecksum.newDataChecksum( checksumIn ) ;
} else {
LOG.warn( "Could not find metadata file for " + block );
// This only decides the buffer size. Use BUFFER_SIZE?
checksum = DataChecksum.newDataChecksum( DataChecksum.CHECKSUM_NULL,
16*1024 );
}
int bytesPerChecksum = checksum.getBytesPerChecksum();
int checksumSize = checksum.getChecksumSize();
if (length < 0) {
length = data.getLength(block);
}
long endOffset = data.getLength( block );
if ( startOffset < 0 || startOffset > endOffset ||
(length + startOffset) > endOffset ) {
String msg = " Offset " + startOffset + " and length " + length +
" don't match block " + block + " ( blockLen " +
endOffset + " )";
LOG.warn( "sendBlock() : " + msg );
if ( targets != null ) {
throw new IOException(msg);
} else {
out.writeShort( OP_STATUS_ERROR_INVALID );
return totalRead;
}
}
byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
long offset = (startOffset - (startOffset % bytesPerChecksum));
if ( length >= 0 ) {
// Make sure endOffset points to end of a checksumed chunk.
long tmpLen = startOffset + length + (startOffset - offset);
if ( tmpLen % bytesPerChecksum != 0 ) {
tmpLen += ( bytesPerChecksum - tmpLen % bytesPerChecksum );
}
if ( tmpLen < endOffset ) {
endOffset = tmpLen;
}
}
// seek to the right offsets
if ( offset > 0 ) {
long checksumSkip = ( offset / bytesPerChecksum ) * checksumSize ;
blockInFile.seek(offset);
if (checksumSkip > 0) {
//Should we use seek() for checksum file as well?
FileUtil.skipFully(checksumIn, checksumSkip);
}
}
blockIn = new DataInputStream(new BufferedInputStream(
new FileInputStream(blockInFile.getFD()),
BUFFER_SIZE));
if ( targets != null ) {
//
// Header info
//
out.writeShort( DATA_TRANFER_VERSION );
out.writeByte( OP_WRITE_BLOCK );
out.writeLong( block.getBlockId() );
out.writeInt(targets.length-1);
for (int i = 1; i < targets.length; i++) {
targets[i].write( out );
}
} else {
out.writeShort( OP_STATUS_SUCCESS );
}
checksum.writeHeader( out );
if ( targets == null ) {
out.writeLong( offset );
}
while ( endOffset >= offset ) {
// Write one data chunk per loop.
int len = (int) Math.min( endOffset - offset, bytesPerChecksum );
if ( len > 0 ) {
blockIn.readFully( buf, 0, len );
totalRead += len;
if ( checksumSize > 0 && checksumIn != null ) {
try {
checksumIn.readFully( buf, len, checksumSize );
totalRead += checksumSize;
} catch ( IOException e ) {
LOG.warn( " Could not read checksum for data at offset " +
offset + " for block " + block + " got : " +
StringUtils.stringifyException(e) );
FileUtil.closeStream( checksumIn );
checksumIn = null;
if ( corruptChecksumOk ) {
// Just fill the array with zeros.
Arrays.fill( buf, len, len + checksumSize, (byte)0 );
} else {
throw e;
}
}
}
}
out.writeInt( len );
out.write( buf, 0, len + checksumSize );
if ( offset == endOffset ) {
out.flush();
// We are not waiting for response from target.
break;
}
offset += len;
}
} finally {
FileUtil.closeStream( blockInFile );
FileUtil.closeStream( checksumIn );
FileUtil.closeStream( blockIn );
FileUtil.closeStream( out );
}
return totalRead;
}
sendBlock() is used to read block and its metadata and stream
the data to either a client or to another datanode.
If argument targets is null, then it is assumed to be replying
to a client request (OP_BLOCK_READ). Otherwise, we are replicating
to another datanode.
returns total bytes reads, including crc. |
public void shutdown() {
if (infoServer != null) {
try {
infoServer.stop();
} catch (Exception e) {
}
}
this.shouldRun = false;
if (dataXceiveServer != null) {
((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
this.dataXceiveServer.interrupt();
}
if(upgradeManager != null)
upgradeManager.shutdownUpgrade();
if (storage != null) {
try {
this.storage.unlockAll();
} catch (IOException ie) {
}
}
if (dataNodeThread != null) {
dataNodeThread.interrupt();
try {
dataNodeThread.join();
} catch (InterruptedException ie) {
}
}
}
Shut down this instance of the datanode.
Returns only after shutdown is complete. |
void startDataNode(Configuration conf,
AbstractList dataDirs) throws IOException {
// use configured nameserver & interface to get local hostname
machineName = DNS.getDefaultHost(
conf.get("dfs.datanode.dns.interface","default"),
conf.get("dfs.datanode.dns.nameserver","default"));
InetSocketAddress nameNodeAddr = createSocketAddr(
conf.get("fs.default.name", "local"));
this.defaultBytesPerChecksum =
Math.max(conf.getInt("io.bytes.per.checksum", 512), 1);
int tmpPort = conf.getInt("dfs.datanode.port", 50010);
storage = new DataStorage();
// construct registration
this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
// connect to name node
this.namenode = (DatanodeProtocol)
RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID,
nameNodeAddr,
conf);
// get version and id info from the name-node
NamespaceInfo nsInfo = handshake();
// read storage info, lock data dirs and transition fs state if necessary
StartupOption startOpt = getStartupOption(conf);
assert startOpt != null : "Startup option must be set.";
storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
// adjust
this.dnRegistration.setStorageInfo(storage);
// initialize data node internal structure
this.data = new FSDataset(storage, conf);
// find free port
ServerSocket ss = null;
String bindAddress = conf.get("dfs.datanode.bindAddress", "0.0.0.0");
while (ss == null) {
try {
ss = new ServerSocket(tmpPort, 0, InetAddress.getByName(bindAddress));
LOG.info("Opened server at " + tmpPort);
} catch (IOException ie) {
LOG.info("Could not open server at " + tmpPort + ", trying new port");
tmpPort++;
}
}
// adjust machine name with the actual port
this.dnRegistration.setName(machineName + ":" + tmpPort);
this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
long blockReportIntervalBasis =
conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
this.blockReportInterval =
blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
DataNode.nameNodeAddr = nameNodeAddr;
//create a servlet to serve full-file content
int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
this.infoServer.start();
// adjust info port
this.dnRegistration.setInfoPort(this.infoServer.getPort());
// get network location
this.networkLoc = conf.get("dfs.datanode.rack");
if (networkLoc == null) // exec network script or set the default rack
networkLoc = getNetworkLoc(conf);
// register datanode
register();
}
|
public String toString() {
return "DataNode{" +
"data=" + data +
", localName='" + dnRegistration.getName() + "'" +
", storageID='" + dnRegistration.getStorageID() + "'" +
", xmitsInProgress=" + xmitsInProgress +
"}";
}
|