# 如何单机排序一个大数据文件? ## **问题来源:** 针对一个大文件,如何对里面的元素进行排序 ## **问题描述:** 24GTxt文件,每行1个大整数,1-100位不等 纯JDK排序。 ## **解决方案:** > 程序流程 1. 源文件采用单线程NIO按行读 2. 读到的每一行入到队列A 3. 开启16个线程(根据CPU核数),去消费这个队列 4. 消费之后,把数据写入相关的文件待排序 5. 开启8个线程并发排序每个待排序文件(读进来,排序,写) 6. 按文件名做合并 ***** > 经验总结 1. 文件的读取先要看清楚是按行还是按字节。 如果按行读,不能用多线程,方法是读1个BUFFERED,判断结束是否是换行,如果不是,就按字节读,一直读到是换行为止,或者按BUFFERED读,然后按换行截取,剩下的就拼在下一个BUFFERED的头部。如果按字节读,可以用多线程(RandomAccessFile 2. 读和写,最好设置缓存大小。16M刚好 3. Eclipse运行的java程序是独立的JVM,如果内存不够,可以加参数-Xms3072m -Xmx6072m 4. 遇到高并发自增,可以采用AtomicInteger 5. ByteBuffer.array() 返回的 array 长度为 ByteBuffer allocate的长度,并不是里面所含的内容的长度 ``` //这样会导致,最后读取的肯定不是allocate的长度,但是array返回的带有上一次的冗余数据 //解决办法如下,重新按照剩余容量来制作一个新的byte byte[] data; if(buffer.remaining() != buffer.capacity()){ data = new byte[buffer.remaining()]; buffer.get(data, 0, data.length); }else{ data = buffer.array(); } String content = new String(data); ``` 6.如果中断线程池里面的线程 可以使用Pool.shutdown. 但是前提是线程里面有阻断方法。如Sleep或者阻塞队列等等。 7.对于阻塞队列,入队和出队所占用的时间比较长,做实时性的性能差,因为阻塞涉及到加锁 8.线程池不能设置setDaemon。如果线程池里面的线程读守候,那线程就无法回收了。矛盾 9.同1时刻,1个CPU运行1个或者多个线程,如8核两线程,那就是一共16个线程 ***** > 测试报告 * 运行结果 1. SSD 10分钟跑完24G 2. 机械硬盘 80分钟跑完24G * 程序启动使用内存 | | 32位JDK启动程序使用内存 | 64位JDK启动程序使用 | |---|---|---| | -Xms1g | 11M | 5M | | -Xms1.1g | 12M | | | -Xms1.2g | 报错 | | | -Xms2g | 报错 | 10M | | -Xms3g | 报错 | 15M | | -Xms5g | 报错 | 25M | | -Xms6g | 报错 | 30M | * BufferedWriter占用内存数(基于64位JDK,-Xms5g) ``` BufferedWriter bw = new BufferedWriter (new FileWriter(new File("D:\\temp\\bigdata\\des3g\\"+i+".txt")),内存大小); ``` * BufferedWriter 缓存 5M 每个对象大概占用10M | 创建对象数量 | 占用内存| |---|---| | 2 | 25M | | 3 | 35M | | 4 | 45M | | 500 | 1265M(GC) | * BufferedWriter 缓存 3M 每个对象大概占用6M | 创建对象数量 | 占用内存| |---|---| | 4 | 25M | | 5 | 31M | | 6 | 37M | | 500 | 507M 1265M (GC) | * BufferedWriter 缓存 1M 每个对象大概占用2M | 创建对象数量 | 占用内存| |---|---| | 12 | 25M | | 13 | 27M | | 14 | 29M | | 500 | 1006M(GC) | ***** > 程序代码 * 排序代码 ``` package com.bingo4; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; public class BigSort { /**************************************** 可配置项 ***********************************/ // 是否开启内存监控,每2秒打印内存情况 public boolean isRamMonitor = false; // 待排序文件 public String SRC_DATA = "d://temp//bigdata/src/100m.txt"; // 排序完毕生成的文件地址 public String DES_DATA_PATH = "d://temp//bigdata//des//"; // 默认开启,每1位数,就分发成10个待排序文件,如果待排序文件里面最大是60位数,就分发成600个待排序文件.源文件如果超过8G左右,必须开启,否则后面单个文件做排序会导致内存溢出 // 如果关闭,每1位数,就分发成1个待排序文件,这个对于源文件不大的情况下,速度极快。 public boolean isDeliverTen = true; // 读入待排序文件缓存 final static int BSIZE = 1024 * 1024 * 1; // 3M // 写入数据区间文件缓存 final static int WRITE_SORT_BSIZE = 1024 * 1024 * 3; // 3M // 排序读写缓存 final static int SORT_READER_BSIZE = 1024 * 1024 * 1; // 5M final static int SORT_WRITE_BSIZE = 1024 * 1024 * 1; // 5M // 合并读写缓存 final static int MERGE_BSIZE = 1024 * 1024 * 2; // 5M // 分发数据线程大小 public static int DELIVER_DATA_QUEUE_SIZE = 16; // 每个数据区间监听队列的线程数, 这里设置为1,效率最高 public static int RANG_QUEUE_SIZE = 1; // 并发排序线程数 public static int SORT_THREAD_SIZE = 8; /**************************************** 可配置项 ***********************************/ public String DES_SORT_DATA_PATH = DES_DATA_PATH + "sort//"; public String MERGE_FILE = DES_DATA_PATH + "merge//merge.txt"; public String MERGE_FILE_PATH = DES_DATA_PATH + "merge//"; int cpuNums = Runtime.getRuntime().availableProcessors(); // 分发数据队列 public ConcurrentLinkedQueue<String> deliverDataQueue = new ConcurrentLinkedQueue<String>(); // 分发数据线程的执行线程池 public ExecutorService deliverDataThreadES = Executors.newFixedThreadPool(DELIVER_DATA_QUEUE_SIZE); // 数据分布范围集合 public Map<Integer, ConcurrentLinkedQueue<String>> dataRangMap = new HashMap<Integer, ConcurrentLinkedQueue<String>>(); // 数据分布写入对象 public Map<Integer, BufferedWriter> dataWriteMap = new ConcurrentHashMap<Integer, BufferedWriter>(); // 数据区间线程池 public ExecutorService dataRangeThreadES = Executors.newFixedThreadPool(1); // CAS:将这个变量更新为新值,但是如果从我上次看到这个变量之后其他线程修改了它的值,那么更新就失败” // 已经读取完毕的数据行数 public AtomicInteger hasReaderDataLine = new AtomicInteger(0); // 通过多线程,已经按数据区间处理好的数据行数 public AtomicInteger hasDataRangeWriteLine = new AtomicInteger(0); // 已排序的总行数 public AtomicInteger hasSortedDataLine = new AtomicInteger(0); // 已经读到内存等待排序的总行数 public AtomicInteger hasWaitSortedDataLine = new AtomicInteger(0); // 已排序的文件数 public AtomicInteger hasSortedFile = new AtomicInteger(0); // 已合并好的文件数 public AtomicInteger hasCombineFile = new AtomicInteger(0); // 程序启动时间 public long startTime = 0l; // 读取文件完成时间 public long finishReadFileTime = 0l; // 等待分发完毕时间 public long finishDeliverFileTime = 0l; // 排序完成时间 public long finishSortFileTime = 0l; // 合并完成时间 public long finishCombineFileTime = 0l; // 内存监控线程 public Thread ramMonitorT = new Thread(new Runnable() { @Override public void run() { try { while (true) { Memory.print(); Thread.sleep(2000); } } catch (Exception e) { } } }); public static void main(String[] args) throws Exception { BigSort sort = new BigSort(); // 待排序文件 if ((args.length > 0) && !args[0].equals("")) { sort.SRC_DATA = args[0]; } // 目的文件 if ((args.length > 1) && !args[1].equals("")) { if (!args[1].endsWith("\\")) { sort.DES_DATA_PATH = args[1] + "\\"; } else { sort.DES_DATA_PATH = args[1]; } sort.DES_SORT_DATA_PATH = sort.DES_DATA_PATH + "sort//"; sort.MERGE_FILE = sort.DES_DATA_PATH + "merge//merge.txt"; sort.MERGE_FILE_PATH = sort.DES_DATA_PATH + "merge//"; } sort.start(); } /** * 程序启动入口 * * @throws Exception * */ public void start() throws Exception { System.out.println(String.format("CPU核心数[%s] 最大可用内存:[%sM] 初始化内存:[%sM]", cpuNums, Memory.getMaxHeapMemory() / 1024 / 1024, Memory.getInitHeapMemory() / 1024 / 1024)); Memory.print(); // 是否开启内存监控 if (isRamMonitor) { ramMonitorT.setDaemon(true); ramMonitorT.start(); } // 1.准备阶段 if (!prepare()) { return; } // 2.对源文件进行读取入队处理 readFile(new File(SRC_DATA)); // 3.等待分发数据线程把数据分发完毕,然后把线程池里面的线程全部终止 waitForFinishWriteDataRange(); System.gc(); // 4.对每个文件单独排序 sort(); // 5.合并 combine(); System.out.println(String.format("[程序已全部完成][一共用时:%s秒][读:%s秒,割:%s秒,排:%s秒,合:%s秒]", ((System.currentTimeMillis() - startTime) / 1000), finishReadFileTime, finishDeliverFileTime, finishSortFileTime, finishCombineFileTime)); System.out.println(String.format("[已排序完的文件在:%s]", MERGE_FILE)); } // 1.准备阶段,文件准备 public boolean prepare() { try { System.out.println("[文件及目录检查][开始]"); File srcFile = new File(SRC_DATA); if (!srcFile.exists()) { System.out.println("[文件及目录检查][失败][待排序文件不存在,程序结束]" + SRC_DATA); return false; } // 删掉已存在的临时文件 File desDataPath = new File(DES_DATA_PATH); // if(desDataPath.exists()){ // if(deleteDir(desDataPath)); // } // 创建目录 if (!desDataPath.exists()) { desDataPath.mkdir(); } // 创建目录 File desSortDataPath = new File(DES_SORT_DATA_PATH); if (!desSortDataPath.exists()) { desSortDataPath.mkdir(); } // 创建目录 File mergeFilePath = new File(MERGE_FILE_PATH); if (!mergeFilePath.exists()) { mergeFilePath.mkdir(); } File mergeFile = new File(MERGE_FILE); if (mergeFile.exists()) { mergeFile.delete(); } System.out.println(String.format("[文件及目录检查][待排序文件路径:%s]", SRC_DATA)); System.out.println(String.format("[文件及目录检查][排序完毕生成的文件地址:%s]", DES_DATA_PATH)); System.out.println("[文件及目录检查][完毕]"); } catch (Exception e) { System.out.println("[文件及目录检查][失败,程序结束][原因]" + e.getMessage()); return false; } System.out.println("[启动分发数据监听线程][开始]"); startTime = System.currentTimeMillis(); for (int i = 0; i < DELIVER_DATA_QUEUE_SIZE; i++) { DeliverDataThread ddt = new DeliverDataThread(deliverDataQueue); deliverDataThreadES.execute(ddt); } System.out.println(String.format("[启动分发数据监听线程][完毕][共启动:%s个监听线程]", DELIVER_DATA_QUEUE_SIZE)); return true; } // 2.对源文件进行读取入队处理 public void readFile(File file) throws Exception { System.out.println(String.format("[读取待排序文件][开始][大小:%sM]", file.length() / 1000 / 1000)); // 读监控线程 Thread monitor = new Thread(new Runnable() { @Override public void run() { try { while (true) { System.out.println(String.format("[读取待排序文件][已读:%s行]", hasReaderDataLine.get())); Thread.sleep(5000); } } catch (Exception e) { } } }); monitor.start(); long startTime = System.currentTimeMillis(); FileUtil util = new FileUtil(new FileUtilImpl() { // 每读到一行,应该怎么处理 public void handlerLin(String line) { hasReaderDataLine.incrementAndGet(); // 获取到每一行的数据然后入队! deliverDataQueue.offer(line.trim()); // 这里必须得去换行 } }); util.nioReadFile(file, BSIZE); monitor.interrupt(); finishReadFileTime = (System.currentTimeMillis() - startTime) / 1000; System.out.println(String.format("[读取待排序文件][完毕][一共读取:%S行][用时:%s秒]", hasReaderDataLine.get(), finishReadFileTime, hasReaderDataLine.get())); } // 3.等待分发数据线程把数据分发完毕,然后把线程池里面的线程全部终止 public void waitForFinishWriteDataRange() throws IOException { System.out.println("[数据分发][正在处理中]"); long cleanDeliverDataThreadStartTime = System.currentTimeMillis(); while (true) { if (hasReaderDataLine.get() == hasDataRangeWriteLine.get()) { // 对BW做结束,把内存中残余的数据写到文件 for (Map.Entry<Integer, BufferedWriter> entry : dataWriteMap.entrySet()) { BufferedWriter bw = entry.getValue(); bw.close(); } break; } } deliverDataThreadES.shutdownNow(); dataRangeThreadES.shutdownNow(); finishDeliverFileTime = (System.currentTimeMillis() - cleanDeliverDataThreadStartTime) / 1000; System.out.println( String.format("[数据分发][完毕][已切割成:%s个待排序文件][用时:%s秒]", dataWriteMap.size(), finishDeliverFileTime)); } // 4.排序 public void sort() throws IOException { System.out.println(String.format("[排序][开始][待排序文件数量:%s个][并发排序数量:%s个]", dataWriteMap.size(), SORT_THREAD_SIZE)); long startTime = System.currentTimeMillis(); ExecutorService sortEs = Executors.newFixedThreadPool(SORT_THREAD_SIZE);// 排序线程池 for (Map.Entry<Integer, BufferedWriter> entry : dataWriteMap.entrySet()) { int dataRange = entry.getKey(); SortThread st = new SortThread(dataRange); sortEs.execute(st); } // 监听排序情况 Thread monitor = new Thread(new Runnable() { @Override public void run() { try { while (true) { System.out.println(String.format("[排序][已排好文件:%s个]", hasSortedFile.get())); System.out.println(String.format("总共:[%s] 已读[%s] 已排:[%s]", hasReaderDataLine.get(),hasWaitSortedDataLine.get(),hasSortedDataLine.get())); Thread.sleep(5000); } } catch (Exception e) { } } }); monitor.start(); sortEs.shutdown(); while (true) { if (sortEs.isTerminated()) { finishSortFileTime = (System.currentTimeMillis() - startTime) / 1000; System.out.println(String.format("[排序][完毕][已排好文件:%s个][已排好:%s行][用时:%s秒]", hasSortedFile.get(), hasSortedDataLine.get(), finishSortFileTime)); break; } try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } // while(true){ // if(hasReaderDataLine.get() == hasSortedDataLine.get()){ // finishSortFileTime = (System.currentTimeMillis() - startTime) / 1000; // System.out.println(String.format("[排序][完毕][已排好文件:%s个][已排好:%s行][用时:%s秒]",hasSortedFile.get(),hasSortedDataLine.get(),finishSortFileTime)); // break; // } // try { // Thread.sleep(500); // } catch (InterruptedException e) // { // e.printStackTrace(); // } // } // sortEs.shutdown(); monitor.interrupt(); } // 5.合并 public void combine() throws IOException, InterruptedException { System.out.println(String.format("[合并文件][开始][待合并文件数量:%s个]", dataWriteMap.size())); // 监听合并情况 Thread monitor = new Thread(new Runnable() { @Override public void run() { try { while (true) { System.out.println(String.format("[合并文件][已合并文件:%s个]", hasCombineFile.get())); Thread.sleep(5000); } } catch (Exception e) { } } }); monitor.start(); File f = new File(DES_SORT_DATA_PATH); String[] files = f.list(); // 对文件名称列表做排序,按顺序合并 List<Integer> fileList = new ArrayList<Integer>(); for (String s : files) { fileList.add(Integer.valueOf(s.replaceAll(".txt", ""))); } Collections.sort(fileList); String[] mergeFiles = new String[fileList.size()]; for (int i = 0; i < fileList.size(); i++) { mergeFiles[i] = DES_SORT_DATA_PATH + String.valueOf(fileList.get(i)) + ".txt"; // mergeFiles[i] = String.valueOf(fileList.get(i))+".txt"; } long mergeStartTime = System.currentTimeMillis(); // 用java读写合并文件 combineFile(MERGE_FILE, mergeFiles); // 用系统命令合并文件 // combineFileUseSysCom(MERGE_FILE,mergeFiles); monitor.interrupt(); finishCombineFileTime = (System.currentTimeMillis() - mergeStartTime) / 1000; System.out.println(String.format("[合并文件][完毕][待排序文件大小:%s][合并完成文件大小:%s][用时:%s秒]", new File(SRC_DATA).length(), new File(MERGE_FILE).length(), finishCombineFileTime)); } // 分配队列区间 public int getDataRange(String data) { int dataRange = data.length(); if (isDeliverTen) { if (dataRange != 1) { String dr = data.substring(0, 1); dataRange = Integer.valueOf(dataRange + "" + dr); } } return dataRange; } /** * * @author 838745 * * 分发数据线程 1. 从分发数据队列中取数据 2. 获取该数据的位数 3. 根据位数,把该数据放到相应的数据区间队列中等待处理 * */ final static Object lock = new Object(); class DeliverDataThread extends Thread { ConcurrentLinkedQueue<String> deliverDataQueue; public DeliverDataThread(ConcurrentLinkedQueue<String> deliverDataQueue) { this.deliverDataQueue = deliverDataQueue; } @Override public void run() { try { while (true) { String data = deliverDataQueue.poll(); if (data == null || data.equals("")) { // 如果不休眠,当前线程会不停的循环,CPU都在当前线程上面,无法调度另外的线程. Thread.sleep(0); continue; } // 按照长度范围,把数据放入相关的区间队列 final int dataRange = getDataRange(data); // 数据区间队列 // 对于2位数,分成10个队列 // 10-19 为1个队列,队列名称是21,20-29为1个队列,队列名称是22 // 对于3位数,分钟10个队列 // 100-199 为1个队列,队列名称是31,200-299为1个队列,队列名称是32以此类推 BufferedWriter bw = dataWriteMap.get(dataRange); if (bw == null) { synchronized (lock) { bw = dataWriteMap.get(dataRange); if (bw == null) { // 产生相应的写入对象 bw = new BufferedWriter(new FileWriter(new File(DES_DATA_PATH + dataRange + ".txt")), WRITE_SORT_BSIZE); dataWriteMap.put(dataRange, bw); } } } synchronized (bw) { bw.write(data); bw.newLine(); // 增加已经处理的行数 hasDataRangeWriteLine.incrementAndGet(); } } } catch (InterruptedException e1) { // System.out.println("结束分发线程:"+Thread.currentThread().getName() // + "用时" + (System.currentTimeMillis() - startTime)/1000 + // "S"); } catch (Exception e) { e.printStackTrace(); } } } class DeliverDataThread_bak extends Thread { ConcurrentLinkedQueue<String> deliverDataQueue; public DeliverDataThread_bak(ConcurrentLinkedQueue<String> deliverDataQueue) { this.deliverDataQueue = deliverDataQueue; } @Override public void run() { long startTime = System.currentTimeMillis(); try { while (true) { String data = deliverDataQueue.poll(); if (data == null || data.equals("")) { // 如果不休眠,当前线程会不停的循环,CPU都在当前线程上面,无法调度另外的线程. Thread.sleep(0); continue; } // 按照长度范围,把数据放入相关的区间队列 int dataRange = getDataRange(data); // 数据区间队列 // 对于2位数,分成10个队列 // 10-19 为1个队列,队列名称是21,20-29为1个队列,队列名称是22 // 对于3位数,分钟10个队列 // 100-199 为1个队列,队列名称是31,200-299为1个队列,队列名称是32以此类推 ConcurrentLinkedQueue<String> dataRangQueue = dataRangMap.get(dataRange); if (dataRangQueue == null) { // 创建队列 dataRangQueue = new ConcurrentLinkedQueue<String>(); // 把当前队列放到MAP中,就可以根据数据位数直接拿到队列 dataRangMap.put(dataRange, dataRangQueue); // 产生相应的写入对象 BufferedWriter bw = new BufferedWriter( new FileWriter(new File(DES_DATA_PATH + dataRange + ".txt")), WRITE_SORT_BSIZE); dataWriteMap.put(dataRange, bw); // 启动数据区间队列的监听线程 DataRangeThread rq = new DataRangeThread(dataRange, dataRangQueue); for (int j = 0; j < RANG_QUEUE_SIZE; j++) { dataRangeThreadES.execute(rq); } } // 按数据位数,把数据放到相应的队列中去 dataRangQueue.offer(data); } } catch (InterruptedException e1) { // System.out.println("结束分发线程:"+Thread.currentThread().getName() // + "用时" + (System.currentTimeMillis() - startTime)/1000 + // "S"); } catch (Exception e) { e.printStackTrace(); } } } /** * 数据区间写入线程 * * @author 838745 * * 1. 从队列中获取相应的数据 2. 把该数据写入到相应的数据区间文件中去 * */ class DataRangeThread extends Thread { ConcurrentLinkedQueue<String> dataRangQueue; int rang; public DataRangeThread(int rang, ConcurrentLinkedQueue<String> dataRangQueue) { this.dataRangQueue = dataRangQueue; this.rang = rang; } @Override public void run() { long startTime = System.currentTimeMillis(); try { while (true) { String data = dataRangQueue.poll(); if (data == null || data.equals("")) { // 如果不休眠,当前线程会不停的循环,CPU都耗在当前线程上面,无法调度另外的线程. Thread.sleep(0); continue; } // 按照长度范围,把数据放入相关的区间队列 BufferedWriter bw = dataWriteMap.get(rang); bw.write(data); bw.newLine(); // 增加已经处理的行数 hasDataRangeWriteLine.incrementAndGet(); } } catch (InterruptedException e1) { // System.out.println("结束数据区间线程:"+rang+" " + "用时" + // (System.currentTimeMillis() - startTime)/1000 + "S"); } catch (Exception e) { e.printStackTrace(); } } } /** * 排序线程 * * @author 838745 * */ class SortThread extends Thread { int dataRange; public SortThread(int dataRange) { this.dataRange = dataRange; } public void run() { StringBuilder newlinesBui = null; String lastLine = null; try { int lineCount = 0; long startTime = System.currentTimeMillis(); long startTime2 = System.currentTimeMillis(); final List<BigInteger> data = new ArrayList<BigInteger>(); File dataFile = new File(DES_DATA_PATH + dataRange + ".txt"); if (!dataFile.exists()) { return; } // 读入文件 FileUtil util = new FileUtil(new FileUtilImpl() { // 每读到一行,应该怎么处理 public void handlerLin(String line) { hasWaitSortedDataLine.incrementAndGet(); // 获取到每一行的数据放入集合等待排序 data.add(new BigInteger(line)); } }); util.nioReadFile(dataFile, SORT_READER_BSIZE); // util.randomReadFile(dataFile, SORT_READER_BSIZE); String readEndTime = (System.currentTimeMillis() - startTime) / 1000 + "S"; // 排序 startTime = System.currentTimeMillis(); Collections.sort(data); String sortTime = (System.currentTimeMillis() - startTime) / 1000 + "S"; // 写到文件 startTime = System.currentTimeMillis(); BufferedWriter bw = new BufferedWriter( new FileWriter(new File(DES_SORT_DATA_PATH + dataRange + ".txt")), SORT_WRITE_BSIZE); int i = 0; for (BigInteger b : data) { i = i++; bw.write(b.toString()); bw.newLine(); hasSortedDataLine.incrementAndGet(); // lineCount++; } bw.close(); String writeTime = (System.currentTimeMillis() - startTime) / 1000 + "S"; hasSortedFile.incrementAndGet(); // System.out.println(String.format("数据区间[%s] [文件大小:%sM] 排序[%s]行 // 完成时间[%s] 读[%s] 排[%s] 写[%s]", // dataRange, // dataFile.length()/1000/1000, // lineCount, // (System.currentTimeMillis() - startTime2) / 1000 +"S" // ,readEndTime, // sortTime, // writeTime)); } catch (Exception e) { e.printStackTrace(); } } } // Windos系统COPY合并程序 public void combineFileUseSysCom(String outFile, String[] files) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (String f : files) { sb.append(f + "+"); } String cmd = sb.substring(0, sb.length() - 1); System.out.println(cmd); String[] cmds = { "cmd", "/C", "copy", "/Y", cmd, MERGE_FILE.replaceAll("//", "\\\\") }; Process p = Runtime.getRuntime().exec(cmds, null, new File(DES_SORT_DATA_PATH.replaceAll("//", "\\\\"))); BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream())); String line = reader.readLine(); while (line != null) { line = reader.readLine(); System.out.println(line); hasCombineFile.incrementAndGet(); } p.waitFor(); } // JAVA合并程序 public void combineFile(String outFile, String[] files) { FileChannel outChannel = null; try { outChannel = new FileOutputStream(outFile).getChannel(); for (String f : files) { FileChannel fc = new FileInputStream(f).getChannel(); ByteBuffer bb = ByteBuffer.allocate(MERGE_BSIZE); while (fc.read(bb) != -1) { bb.flip(); // 回绕缓冲区,索引重置为开头 outChannel.write(bb); bb.clear(); } fc.close(); hasCombineFile.incrementAndGet(); } } catch (IOException ioe) { ioe.printStackTrace(); } finally { try { if (outChannel != null) { outChannel.close(); } } catch (IOException ignore) { } } } /** * 递归删除目录下的所有文件及子目录下所有文件 * * @param dir * 将要删除的文件目录 * @return boolean Returns "true" if all deletions were successful. If a * deletion fails, the method stops attempting to delete and returns * "false". */ private static boolean deleteDir(File dir) { if (dir.isDirectory()) { String[] children = dir.list(); for (int i = 0; i < children.length; i++) { boolean success = deleteDir(new File(dir, children[i])); if (!success) { return false; } } } // 目录此时为空,可以删除 return dir.delete(); } /************************************************* 读文件工具类 ***************************************/ interface FileUtilImpl { public void handlerLin(String line); } class FileUtil implements FileUtilImpl { FileUtilImpl impl; public FileUtil(FileUtilImpl impl) { this.impl = impl; } // 读到的行应该怎么处理 public void handlerLin(String line) { impl.handlerLin(line); } // nio读文件 public void nioReadFile(File file, int SIZE) throws IOException { String enterStr = "\n"; FileChannel inChannel = new FileInputStream(file).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(SIZE); StringBuilder newlinesBui = new StringBuilder(); while (inChannel.read(buffer) != -1) { buffer.flip(); // ByteBuffer.array() 返回的 array 长度为 ByteBuffer // allocate的长度,并不是里面所含的内容的长度 // 这样会导致,最后读取的肯定不是allocate的长度,但是array返回的带有上一次的冗余数据 // 解决办法如下,重新按照剩余容量来制作一个新的byte byte[] contentBytes; if (buffer.remaining() != buffer.capacity()) { contentBytes = new byte[buffer.remaining()]; buffer.get(contentBytes, 0, contentBytes.length); } else { contentBytes = buffer.array(); } String content = new String(contentBytes); newlinesBui.append(content); int fromIndex = 0; int endIndex = -1; // 循环找到 \n String line; while ((endIndex = newlinesBui.indexOf(enterStr, fromIndex)) > -1) { // 得到一行 line = newlinesBui.substring(fromIndex, endIndex).trim(); if (line != null && !line.trim().equals("")) { impl.handlerLin(line); } fromIndex = endIndex + 1; } newlinesBui.delete(0, fromIndex); buffer.clear(); } // 最后一行 String lastLine = newlinesBui.substring(0, newlinesBui.length()).trim(); if (lastLine != null && !lastLine.equals("")) { impl.handlerLin(lastLine); } inChannel.close(); } } /************************************************* 读文件工具类 ***************************************/ /************************************************* 内存监控工具类 ***************************************/ static class Memory { public static long getMaxHeapMemory() { MemoryMXBean mmb = ManagementFactory.getMemoryMXBean(); return mmb.getHeapMemoryUsage().getMax(); } public static long getInitHeapMemory() { MemoryMXBean mmb = ManagementFactory.getMemoryMXBean(); return mmb.getHeapMemoryUsage().getInit(); } public static long getUsedHeapMemory() { MemoryMXBean mmb = ManagementFactory.getMemoryMXBean(); return mmb.getHeapMemoryUsage().getUsed(); } public static void print() { System.out.println(String.format("已经使用内存:[%sM] 剩余可用内存:[%sM]", Memory.getUsedHeapMemory() / 1024 / 1024, ((Memory.getMaxHeapMemory() / 1024 / 1024) - (Memory.getUsedHeapMemory() / 1024 / 1024)))); } } /************************************************* 内存监控工具类 ***************************************/ } ``` * 测试代码 ``` package com.bingo; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.math.BigInteger; import java.util.Arrays; import java.util.Random; public class CreateFile { public static Thread ramMonitorT = new Thread(new Runnable() { @Override public void run() { try { while(true){ System.out.println(String.format("已生成文件大小:[%sM]", line/30000)); Thread.sleep(2000); } } catch (Exception e) { } } }); public static String SRC_DATA = "d://temp//bigdata/src/100m.txt"; public static int line = 0; public static void main(String[] args) throws InterruptedException, IOException { ramMonitorT.setDaemon(true); ramMonitorT.start(); //待排序文件 if( (args.length > 0) && !args[0].equals("")){ SRC_DATA = args[0]; } System.out.println("生成文件路径:"+SRC_DATA); //文件大小 int m = 0; if( (args.length > 1) && !args[1].equals("")){ m = Integer.valueOf(args[1]); } System.out.println("生成文件大小:"+m+"M"); BufferedWriter bw = new BufferedWriter(new FileWriter(SRC_DATA)); //文件大小,1M=30000行,100M = 300W行,1G=3000W行,24G=3000W*24 int fileSize = 30000*m; for(int j = 0;j < fileSize;j++){ int rang = (int)(Math.random()*60)+1; StringBuffer num = new StringBuffer(); for(int i = 0; i< rang ; i++){ if(i != 0){ num.append((int)(Math.random()*10)); }else{ num.append((int)(Math.random()*9)+1); } } bw.write(num.toString()); bw.newLine(); line ++; if(j % 10000 == 0){ bw.flush(); } } bw.close(); System.out.println("完!"); } } ```