Kafka巨量数据写文件
最近一个项目用到kafka 客户端接收消息,要求是写入文件(按顺序)。有2个思路:1. 利用log4j去写文件,优点是稳定可靠,文件根据设置,自动分隔大小。缺点是没有办法做到 当写文件到一定数量或者一定时间后,自动切换目录的功能。如果循环写文件,比如设置最多保留10个,那么就需要用shell脚本去定时备份转移这些文件,当然也是能解决的。但是用代码解决这个问题也很简单:/*** @d
·
最近一个项目用到kafka 客户端接收消息,要求是写入文件(按顺序)。
有2个思路:
1. 利用log4j去写文件,优点是稳定可靠,文件根据设置,自动分隔大小。缺点是没有办法做到 当写文件到一定数量或者一定时间后,自动切换目录的功能。如果循环写文件,比如设置最多保留10个,那么就需要用shell脚本去定时备份转移这些文件,当然也是能解决的。但是用代码解决这个问题也很简单:
/**
* @desc:利用log4j写kafka数据文件,根据当前时间点,命名文件目录
* <p> 如2018020422表示下午22点生成的目录
* @since 2018-2-4
* @author chaisson
*
*/
public class MessageHandler {
private final static Logger logger = Logger.getLogger(MessageHandler.class);
private static RollingFileAppender newAppender = null;
//切换Log4J Appender的hour时间点
private static int changeAppenderHour = 0;
private static String filePath = "D:/log/temp/bak/";
private static String fileName = "message.log";
public static void write(String message) throws IOException {
changeAppender();
logger.info(message);
}
//变更Appender对象
public synchronized static void changeAppender() throws IOException {
if(newAppender == null){
addAppender();
changeAppenderHour = Calendar.getInstance().get(Calendar.MINUTE);
}
if(Calendar.getInstance().get(Calendar.MINUTE) != changeAppenderHour){
addAppender();
changeAppenderHour = Calendar.getInstance().get(Calendar.MINUTE);
}
}
//新增Appender对象
private static void addAppender() throws IOException {
Layout layout = new PatternLayout("%m%n");
String dir = new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
newAppender = new RollingFileAppender(layout,filePath + File.separator + dir + File.separator + fileName);
newAppender.setName("kafkaLogAppender");
newAppender.setMaxFileSize("500MB");
newAppender.setMaxBackupIndex(1000);
logger.removeAppender("kafkaLogAppender");
logger.addAppender(newAppender);
}
}
测试代码:
/**
* @desc:测试log4j写文件
* @since 2018-2-4
* @author chaisson
*
*/
public class TestLog4j extends Thread {
private final Logger logger = Logger.getLogger(TestLog4j.class);
private int no;
public TestLog4j (int no){
this.no = no;
}
private void testWriteFile(int no) throws Exception{
//StringHandler handler = new StringHandler(no);
String s = "xxxxxxxxxx";
//logger.info("消息长度:"+s.length());
long t1 = System.currentTimeMillis();
for(int i = 1;i <300000;i++){
MessageHandler.write(i+"-->"+s);
//handler.write(":"+i+"-->"+s);
}
long t2 = System.currentTimeMillis();
logger.info("It cost " + (t2 -t1) + " ms.");
}
@Override
public void run(){
try {
testWriteFile(no);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
TestLog4j t1 = new TestLog4j(1);
TestLog4j t2 = new TestLog4j(2);
t1.start();
t2.start();
}
}
问题解决,利用log4j写文件,可以轻松应对每小时600W条消息(每条消息长度3500字符左右)的场景,测试发现最高每小时可以写入超过2000W条长内容消息(文件大小60G以上)
2. 自己写文件,优点想怎么写就怎么写,文件目录和名称想怎么变就怎么变,效率高,可以绝对保证消息按顺序写入。缺点稳定性不如log4j,代码占用内存较多,未经过实践检验其可靠性。
/**
* @desc:kafka写文件
* @since 2018-2-5
* @author chaisson
*
*/
public class StringHandler {
private final Logger logger = Logger.getLogger(StringHandler.class);
private ThreadPoolExecutor threadPool;
//定义一个有界阻塞队列
private ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(999);
//线程池队列
private LinkedBlockingQueue<Runnable> poolQueue = new LinkedBlockingQueue<Runnable>();
public int m_MaxFileLength = 1024 * 1024 * 300; //满300M后分隔文件
private int serialNo;
private String basePath = "D:/log/temp/bak/";
private String fileName = "message.log";
private File file = null;
public StringHandler(int serialNo){
this.serialNo = serialNo;
this.start(this);
}
public void start(final StringHandler object){
//线程池大小为1,效率最低,但是能保证线程绝对按顺序执行
threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, poolQueue);
Runnable exeOut = new Runnable(){
public void run() {
while(true){
try {
String msg = messageQueue.take();
Thread.sleep(100);
Collection<String> lines = new ArrayList<String>();
lines.add(msg);
messageQueue.drainTo(lines) ;
//移除此队列的头部并加入结果
logger.info("will write " + lines.size() + " to file.");
while(poolQueue.size() > 20){
//防止过度占用内存,当写文件线程数达到20时,停止继续创建线程(建议值不要超过100)
Thread.sleep(100);
}
Runnable task = new WriterFileThread(object,lines);
threadPool.execute(task);
} catch (InterruptedException e) {
logger.error(e, e);
}
}
}
};
Thread t = new Thread(exeOut);
t.start();
}
public void write(String msg){
try {
messageQueue.put(msg);
} catch (InterruptedException e) {
logger.error(e, e);
}
}
public File getFile() {
return file;
}
public void setFile(File file) {
this.file = file;
}
public int getSerialNo() {
return serialNo;
}
public String getBasePath() {
return basePath;
}
public String getFileName() {
return fileName;
}
}
class WriterFileThread implements Runnable {
private final Logger logger = Logger.getLogger(WriterFileThread.class);
private static final ReentrantLock lock = new ReentrantLock();
private StringHandler object;
private String msg;
private Collection<String> lines;
public WriterFileThread(StringHandler object, Collection<String> lines) {
this.object = object;
this.lines = lines;
}
public WriterFileThread(StringHandler object, String msg, Collection<String> lines) {
this.object = object;
this.msg = msg;
this.lines = lines;
}
public void run() {
lock.lock();
FileWriter fw = null;
BufferedWriter bw = null;
try {
//创建文件
synchronized (object) {
if(object.getFile() == null || object.getFile().length() > object.m_MaxFileLength){
logger.info("serialNo " + object.getSerialNo());
object.setFile(createNewFile());
}
}
fw = new FileWriter(object.getFile(), true);
bw = new BufferedWriter(fw);
if(msg != null){
//OutputStream os = new FileOutputStream(file, true);
IOUtils.write(msg, bw);
}
if(lines != null){
//OutputStream os = new FileOutputStream(file, true);
//IOUtils.writeLines(lines, null, os, "UTF-8");
IOUtils.writeLines(lines,null,bw);
}
} catch (IOException e) {
logger.error(e, e);
} finally {
IOUtils.closeQuietly(bw);
IOUtils.closeQuietly(fw);
lock.unlock();
}
}
private File createNewFile() {
String dir1 = "Thread_" + object.getSerialNo();
String dir2 = new SimpleDateFormat("yyyyMMddHH").format(new Date());
String namedSuffix = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date());
File file = new File(object.getBasePath() + File.separator + dir1 + File.separator + dir2 + File.separator + object.getFileName() + "_"
+ namedSuffix);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
return file;
}
}
测试代码:
private void testWriteFile(int no) throws Exception{
StringHandler handler = new StringHandler(no);
String s = "xxxx";
logger.info("消息长度:"+s.length());
long t1 = System.currentTimeMillis();
for(int i = 1;i <300000;i++){
//MessageHandler.write(i+"-->"+s);
handler.write(":"+i+"-->"+s);
}
long t2 = System.currentTimeMillis();
logger.info("It cost " + (t2 -t1) + " ms.");
}
经测试,每小时能轻松应对超过1000W条的
长内容消息(
字符长度
超过
3000)。
如果提高 消息队列messageQueue的最大容量(消息缓存量),可以最高处理超过2000W以上长内容消息。
如果不考虑消息顺序写入,那么设置一定的线程池大小,则最高可以处理超过3000W以上的长内容消息。
更多推荐
已为社区贡献1条内容
所有评论(0)