最近一个项目用到kafka 客户端接收消息,要求是写入文件(按顺序)。

有2个思路:

1. 利用log4j去写文件,优点是稳定可靠,文件根据设置,自动分隔大小。缺点是没有办法做到 当写文件到一定数量或者一定时间后,自动切换目录的功能。如果循环写文件,比如设置最多保留10个,那么就需要用shell脚本去定时备份转移这些文件,当然也是能解决的。但是用代码解决这个问题也很简单

/**
 * @desc:利用log4j写kafka数据文件,根据当前时间点,命名文件目录
 *        <p> 如2018020422表示下午22点生成的目录 
 * @since 2018-2-4
 * @author chaisson
 *
 */
public class MessageHandler {
	
	private final static Logger logger = Logger.getLogger(MessageHandler.class);
	
	private static RollingFileAppender newAppender = null;
	
	//切换Log4J Appender的hour时间点
	private static int changeAppenderHour = 0;
	
	private static String filePath = "D:/log/temp/bak/";
	
	private static String fileName = "message.log";
	
	public static void write(String message) throws IOException {
		changeAppender();
		logger.info(message);
	}
	
	//变更Appender对象
	public synchronized static void changeAppender() throws IOException {
		if(newAppender == null){
			addAppender();
			changeAppenderHour = Calendar.getInstance().get(Calendar.MINUTE);
		}
		if(Calendar.getInstance().get(Calendar.MINUTE) != changeAppenderHour){
			addAppender();
			changeAppenderHour = Calendar.getInstance().get(Calendar.MINUTE);
		}
	}
	
	//新增Appender对象
	private static void addAppender() throws IOException {
		Layout layout = new PatternLayout("%m%n");  
		String dir = new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
		newAppender = new RollingFileAppender(layout,filePath + File.separator + dir + File.separator + fileName);  
		newAppender.setName("kafkaLogAppender");
		newAppender.setMaxFileSize("500MB");
		newAppender.setMaxBackupIndex(1000);
		logger.removeAppender("kafkaLogAppender");
		logger.addAppender(newAppender);
	}
}
测试代码:

/**
 * @desc:测试log4j写文件
 * @since 2018-2-4
 * @author chaisson
 *
 */
public class TestLog4j extends Thread {
	
	private final Logger logger = Logger.getLogger(TestLog4j.class);
	
	private int no;
	
	public TestLog4j (int no){
		this.no = no;
	}
	
	private void testWriteFile(int no) throws Exception{
		//StringHandler handler = new StringHandler(no);
		String s = "xxxxxxxxxx";
		//logger.info("消息长度:"+s.length());
		long t1 = System.currentTimeMillis();
		for(int i = 1;i <300000;i++){
			MessageHandler.write(i+"-->"+s);
			//handler.write(":"+i+"-->"+s);
		}
		long t2 = System.currentTimeMillis();
		logger.info("It cost " + (t2 -t1) + " ms.");
	}
	
	@Override
	public void run(){
		try {
			testWriteFile(no);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		TestLog4j t1 = new TestLog4j(1);
		TestLog4j t2 = new TestLog4j(2);
		t1.start();
		t2.start();
	}
}
问题解决,利用log4j写文件,可以轻松应对每小时600W条消息(每条消息长度3500字符左右)的场景,测试发现最高每小时可以写入超过2000W条长内容消息(文件大小60G以上)


2. 自己写文件,优点想怎么写就怎么写,文件目录和名称想怎么变就怎么变,效率高,可以绝对保证消息按顺序写入。缺点稳定性不如log4j,代码占用内存较多,未经过实践检验其可靠性。

/**
 * @desc:kafka写文件
 * @since 2018-2-5
 * @author chaisson
 *
 */
public class StringHandler {
	
	private final Logger logger = Logger.getLogger(StringHandler.class);
	
	private ThreadPoolExecutor threadPool;
	
	//定义一个有界阻塞队列
	private ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(999);
	
	//线程池队列
	private LinkedBlockingQueue<Runnable> poolQueue = new LinkedBlockingQueue<Runnable>();
	
	public int m_MaxFileLength = 1024 * 1024 * 300; //满300M后分隔文件
	
	private int serialNo;
	
	private String basePath = "D:/log/temp/bak/";
	
	private String fileName = "message.log";
	
	private File file = null;
	
	public StringHandler(int serialNo){
		this.serialNo = serialNo;
		this.start(this);
	}
	
	public void start(final StringHandler object){
		//线程池大小为1,效率最低,但是能保证线程绝对按顺序执行
		threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, poolQueue);
		Runnable exeOut = new Runnable(){
			public void run() {
				while(true){
					try {
						String msg = messageQueue.take();
						Thread.sleep(100);
						Collection<String> lines = new ArrayList<String>();
						lines.add(msg);
						messageQueue.drainTo(lines) ;
						//移除此队列的头部并加入结果
						logger.info("will write " + lines.size() + " to file.");						
						while(poolQueue.size() > 20){
							//防止过度占用内存,当写文件线程数达到20时,停止继续创建线程(建议值不要超过100)
							Thread.sleep(100);
						}
						Runnable task = new WriterFileThread(object,lines);
						threadPool.execute(task);
					} catch (InterruptedException e) {
						logger.error(e, e);
					} 
				}
			}
		};
		Thread t = new Thread(exeOut);
		t.start();
	}
	
	public void write(String msg){
		try {
			messageQueue.put(msg);
		} catch (InterruptedException e) {
			logger.error(e, e);
		}
	}

	public File getFile() {
		return file;
	}

	public void setFile(File file) {
		this.file = file;
	}

	public int getSerialNo() {
		return serialNo;
	}

	public String getBasePath() {
		return basePath;
	}

	public String getFileName() {
		return fileName;
	}
}

class WriterFileThread implements Runnable {
	
	private final Logger logger = Logger.getLogger(WriterFileThread.class);
	
	private static final ReentrantLock lock = new ReentrantLock(); 
	
	private StringHandler object;
	 
    private String msg;
    
    private Collection<String> lines;
    
    public WriterFileThread(StringHandler object, Collection<String> lines) {
    	this.object = object;
        this.lines = lines;
    }
 
    public WriterFileThread(StringHandler object, String msg, Collection<String> lines) {
    	this.object = object;
        this.msg = msg;
        this.lines = lines;
    }
 
    public void run() {
    	lock.lock();
    	FileWriter fw = null;  
    	BufferedWriter bw = null;
        try {
			
			//创建文件
			synchronized (object) {
				if(object.getFile() == null || object.getFile().length() > object.m_MaxFileLength){
					logger.info("serialNo " + object.getSerialNo());
					object.setFile(createNewFile());
				}
			}
        	fw = new FileWriter(object.getFile(), true);
    		bw = new BufferedWriter(fw); 
        	if(msg != null){
        		//OutputStream os = new FileOutputStream(file, true); 
        		IOUtils.write(msg, bw);
        	}
        	if(lines != null){
        		//OutputStream os = new FileOutputStream(file, true);  
        		//IOUtils.writeLines(lines, null, os, "UTF-8");  
        		IOUtils.writeLines(lines,null,bw);
        	}
		} catch (IOException e) {
			logger.error(e, e);
		} finally {
			IOUtils.closeQuietly(bw);  
            IOUtils.closeQuietly(fw);
            lock.unlock();
		}
    }
    
	private File createNewFile() {
		String dir1 = "Thread_" + object.getSerialNo();
		String dir2 = new SimpleDateFormat("yyyyMMddHH").format(new Date());
		String namedSuffix = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date());
		File file = new File(object.getBasePath() + File.separator + dir1 + File.separator + dir2 + File.separator + object.getFileName() + "_"
				+ namedSuffix);
		if (!file.getParentFile().exists()) {
			file.getParentFile().mkdirs();
		}
		return file;
	}
}
测试代码:
	private void testWriteFile(int no) throws Exception{
		StringHandler handler = new StringHandler(no);
		String s = "xxxx";
		logger.info("消息长度:"+s.length());
		long t1 = System.currentTimeMillis();
		for(int i = 1;i <300000;i++){
			//MessageHandler.write(i+"-->"+s);
			handler.write(":"+i+"-->"+s);
		}
		long t2 = System.currentTimeMillis();
		logger.info("It cost " + (t2 -t1) + " ms.");
	}
经测试,每小时能轻松应对超过1000W条的 长内容消息( 字符长度 超过 3000)。
如果提高 消息队列messageQueue的最大容量(消息缓存量),可以最高处理超过2000W以上长内容消息。
如果不考虑消息顺序写入,那么设置一定的线程池大小,则最高可以处理超过3000W以上的长内容消息。







Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐