HDFS中Java的API使用测试

import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

class TestHDFSFile {

        // 在hdfs上创建目录
        public void CreateFilePath(String hdfspath) throws IOException {
                Configuration conf = new Configuration();
                FileSystem hdfs = FileSystem.get(conf);
                // System.out.println("Upload to " + conf.get("fs.default.name"));
                if (hdfs.exists(new Path(hdfspath))) {
                        System.out.println("文件目录已存在");
                } else {
                        hdfs.mkdirs(new Path(hdfspath));
                }

                // 列出该目录下的所有文件,以便查看
                Path parentpath = new Path(hdfspath);
                HdfsFileList(parentpath.getParent());

        }

        // 查看某目录下的文件列表
        public void HdfsFileList(Path hdfspath) throws IOException {
                Configuration conf = new Configuration();
                FileSystem hdfs = FileSystem.get(conf);
                // System.out.println("Upload to " + conf.get("fs.default.name"));
                FileStatus files[] = hdfs.listStatus(hdfspath);
                if (files.length == 0) {
                        System.out.println("该目录下没有任何文件");
                } else {
                        for (FileStatus file : files) {
                                System.out.println(file.getPath());
                        }
                }
        }

        // 上传本地文件到HDFS
        public void UploadFileToHDFS(String localpath, String hdfspath)
                        throws Exception {

                Configuration conf = new Configuration();
                // conf.addResource(new Path(localPath + "core-site.xml"));
                FileSystem hdfs = FileSystem.get(conf);
                Path src = new Path(localpath);
                Path dst = new Path(hdfspath);
                hdfs.copyFromLocalFile(src, dst);

                // 输出hdfs上目录中的文件列表
                HdfsFileList(dst);
        }

        // 创建HDFS文件,并对文件进行些内容
        public void CreateFile(String hdfspath) throws Exception {
                Configuration conf = new Configuration();
                // byte[] buff = "hello world!".getBytes();
                Scanner sc = new Scanner(System.in);
                System.out.println("请输入一行字符串");
                String src = sc.nextLine();// 在控制台获取一行字符串

                FileSystem hdfs = FileSystem.get(conf);
                Path dst = new Path(hdfspath);
                FSDataOutputStream outputStream = null;
                try {
                        outputStream = hdfs.create(dst);
                        // 将这一行字符串写入文件中
                        outputStream.writeBytes(src); // write(buff, 0, buff.length);
                } catch (Exception e) {
                        e.printStackTrace();

                } finally {
                        if (outputStream != null) {
                                outputStream.close();
                        }
                }

                HdfsFileList(dst.getParent());
        }

        // 重命名HDFS文件

        public void RenameFileName(String oldname, String newname) throws Exception {

                Configuration conf = new Configuration();

                FileSystem hdfs = FileSystem.get(conf);
                // Path dst = new Path(hdfspath);

                Path frpath = new Path(oldname);
                Path topath = new Path(newname);

                hdfs.rename(frpath, topath);

                HdfsFileList(topath.getParent());
        }

        // 刪除HDFS文件或目录
        public void DelHDFSFiles(String hdfspath) throws Exception {

                Configuration conf = new Configuration();

                FileSystem hdfs = FileSystem.get(conf);
                // Path dst = new Path(hdfspath);

                Path topath = new Path(hdfspath);
                if(!hdfs.exists(topath)){
                        System.out.println("文件不存在");
                        return ;
                }
                
                boolean ok = hdfs.delete(topath, true);
                System.out.println(ok ? "删除成功" : "删除失败");

                HdfsFileList(topath.getParent());
        }

        // 查看HDFS文件或文件夹里所有文件的最后修改时间
        public void GetFileModifyTime(String hdfspath) throws Exception {

                Configuration conf = new Configuration();

                FileSystem hdfs = FileSystem.get(conf);
                Path dst = new Path(hdfspath);

                FileStatus files[] = hdfs.listStatus(dst);
                for (FileStatus file : files) {
                        /*
                         * System.out.println(file.getPath() + "\t" +
                         * file.getModificationTime());
                         */

                        System.out.println(file.getPath() + "\t"
                                        + new Date(file.getModificationTime()));

                }
        }

        // 查看HDFS文件是否存在
        public boolean IsExists(String hdfspath) throws Exception {

                Configuration conf = new Configuration();

                FileSystem hdfs = FileSystem.get(conf);
                Path dst = new Path(hdfspath);

                boolean ok = hdfs.exists(dst);
                System.out.println(ok ? "文件存在" : "文件不存在");
                return ok;
        }

        // 查看某个文件在HDFS集群的位置
        public void FileBlockLocation(String hdfspath) throws Exception {

                Configuration conf = new Configuration();

                FileSystem hdfs = FileSystem.get(conf);
                Path dst = new Path(hdfspath);

                FileStatus fileStatus = hdfs.getFileStatus(dst);
                BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fileStatus,
                                0, fileStatus.getLen());
                for (BlockLocation block : blockLocations) {
                        System.out.println(Arrays.toString(block.getHosts()) + "\t"
                                        + Arrays.toString(block.getNames()) + "\t"
                                        + block.getLength());
                }
        }

        // 获取HDFS集群上所有节点名称
        public void GetHostName() throws Exception {

                Configuration conf = new Configuration();

                DistributedFileSystem hdfs = (DistributedFileSystem) FileSystem
                                .get(conf);
                DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();

                for (DatanodeInfo dataNode : dataNodeStats) {
                        System.out.println(dataNode.getHostName() + "\t"
                                        + dataNode.getName());
                }
        }
}