有多个按时间戳(20177041735,20177041740)排序下来的文件,须要对这些文件按天(20170417)合并:web
/*
* 按天合并文件
*/
public class MergerDayFile {
private static final Log LOG = LogFactory.getLog(MergerDayFile.class);svg
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
// 读取配置文件"./config/mergerFile.properties"
FileInputStream in = new FileInputStream("./config/mergerFile.properties");
properties.load(in);
String dest = properties.getProperty("dest");
String src1 = properties.getProperty("src1");
String url = properties.getProperty("url");
// 相同文件按天合并,合并到指定位置
mergerfile(dest, src1, url);
try {
in.close();
} catch (Exception e) {
// TODO: handle exception
}
}
private static void mergerfile(String destPath, String src1, String url) throws Exception {
Configuration conf = new Configuration();
// HashMap<String, List<Path>> map = new HashMap<String, List<Path>>();
FileSystem fs = FileSystem.get(new URI(url), conf, "root");
FileStatus[] allFiles = fs.listStatus(new Path(src1));
String lastFile = null;
String currentFile = null;
List<Path> mergerDay = new ArrayList<Path>();
for (int i = 0; i < allFiles.length; i++) {
FileStatus file = allFiles[i];
Path path = file.getPath();
String name = path.getName();
// 解析文件名
currentFile = parseTime(name);
// 解析上一次文件名
String lastFileName = parseTime(lastFile);
// 第一次结果判断
if (lastFile == null) {
mergerDay.add(path);
lastFile = currentFile;
} else if (i == allFiles.length - 1) {
mergerData(destPath, mergerDay, fs);
mergerDay.clear();
mergerDay.add(path);
mergerData(destPath, mergerDay, fs);
} else if (currentFile.equals(lastFileName)) {
mergerDay.add(path);
} else {
mergerData(destPath, mergerDay, fs);
mergerDay.clear();
mergerDay.add(path);
lastFile = currentFile;
}
}
fs.close();
}
private static void mergerData(String destPath, List<Path> mergerDay, FileSystem fs) {
Path key = mergerDay.get(0);
String name = key.getName();
try {
FSDataOutputStream out = fs.create(new Path(destPath + name));
for (Path path : mergerDay) {
FSDataInputStream open = fs.open(path);
IOUtils.copyBytes(open, out, 8012, false);
open.close();
}
LOG.info(name + "合并成功");
out.close();
} catch (IOException e) {
LOG.error(name + "合并失败", e);
e.printStackTrace();
}
}
// 按天合并文件
private static String parseTime(String name) {
String day = null;
try {
String[] datas = name.split("\\.", 2);
String date = datas[0];
day = date.substring(0, 8);
} catch (Exception e) {
}
return day;
}