项目中会用到往hdfs 上写文件  ,为下面kafka 往hdfs 上写文件做基础。

实例如下:


1、配置文件:com/xiefg/config/system.properties   

#以下是安装 hadoop 配置文件的路径

core.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/core-site.xml
hdfs.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/hdfs-site.xml
yarn.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/yarn-site.xml
mapred.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/mapred-site.xml

2、读取配置文件的工具类:

package com.xiefg.util;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.PropertyResourceBundle;
import java.util.ResourceBundle;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;


/***
 * 
  * @ClassName: PropertiesUtils
  * @Description: TODO
  * @author Comsys-xiefg
  * @date 2017年2月5日 下午2:41:52
  *
 */
public class PropertiesUtils {
	
	private static ResourceBundle resources=null;
	
    public static String HDFS_PATH = null;
    public static String YARN_PATH = null;
    public static String CORE_PATH = null;
    public static String MAPRED_PATH = null;
	
	static{
		InputStream in;
		try {
			/*String config_path = System.getProperty("user.dir") + "/config/system.properties";  
			in = new BufferedInputStream(new FileInputStream(config_path));*/
			
		    in=Thread.currentThread().getContextClassLoader().getResourceAsStream(
                    "com/xiefg/config/system.properties");
			resources = new PropertyResourceBundle(in);
			
			//初始化hadoop配置
			initHadoopConfig();
		
			
		} catch (Exception e) {
			e.printStackTrace();
		}  
		
	}
	
	/**
	 * 
	* @Title: initHadoopConfig
	* @Description: 初始化hadoop 配置
	* @return void    返回类型
	* @throws
	 */
	public  static  void initHadoopConfig(){
	    HDFS_PATH   = resources.getString("hdfs.path");
        YARN_PATH   = resources.getString("yarn.path");
        CORE_PATH   = resources.getString("core.path");
        MAPRED_PATH = resources.getString("mapred.path");
		
	}
	
	
	   public static Configuration getHDFSConf() {
	        Configuration conf = new Configuration();
	        conf.addResource(new Path(HDFS_PATH));
	        conf.addResource(new Path(CORE_PATH));
	        conf.addResource(new Path(MAPRED_PATH));
	        conf.addResource(new Path(YARN_PATH));
	        return conf;
	    }
	
	   /**
     * 获取指定属性值
     * @param property 属性名
     * @return
     */
    public static String getPropertiesValue(String property) {
        String val = "";
        try {
            val = resources.getString(property);
        } catch (Exception e) {
            // ignore
        	e.printStackTrace();
        }
        return val;
    }
	

	

	
	public static void main(String[] args) {
		 Configuration conf = PropertiesUtils.getHDFSConf();
		  try {
			HdfsFileUtil.appendFile(conf, "/test/kafka", "kafka test to hdfs xiefg".getBytes());
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		//System.out.println(PropertiesUtils.getPropertiesValue(KafkaProperties.ZK));
	}
	
}


3、hdfs 文件工具类

/**
 * 
 */
package com.xiefg.util;


import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;



/**
 * 
  * @ClassName: HdfsFileUtil
  * @Description: TODO
  * @author Comsys-xiefg
  * @date 2017年2月6日 上午10:23:13
  *
 */
public class HdfsFileUtil {
	/***
	 * 
	* @Title: deleteHfile
	* @Description: 删除hdfs指定目录的文件
	* @param conf
	* @param ioPath
	* @throws IOException    设定文件
	* @return void    返回类型
	* @throws
	 */
	public static void deleteHfile(Configuration conf, String ioPath)
			throws IOException {

		
		FileSystem fileSystem=null;
		try{
			fileSystem = FileSystem.get(conf);
			fileSystem.delete(new Path(ioPath), true);
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			try {fileSystem.close();} catch (IOException e) {}
		}
	}
    
	/**
	 * 
	* @Title: createFile
	* @Description:写文件
	* @param conf
	* @param file
	* @param content
	* @throws IOException    设定文件
	* @return void    返回类型
	* @throws
	 */

	public static void createFile(Configuration conf,String file, String content) throws IOException {
		FileSystem fs = FileSystem.get(conf);
		byte[] buff = content.getBytes();
		FSDataOutputStream os = null;
		try {
			os = fs.create(new Path(file));
			os.write(buff, 0, buff.length);
			os.flush();
			System.out.println("Create: " + file);
		} finally {
			if (os != null)
				os.close();
		}
		fs.close();
	}
	/**
	 * 
	* @Title: appendFile
	* @Description: 创建文件并追加内容
	* @param conf
	* @param file
	* @param buff
	* @throws IOException    设定文件
	* @return void    返回类型
	* @throws
	 */
	public static void appendFile(Configuration conf,String file,byte[] buff) throws IOException {
		FileSystem fs = FileSystem.get(conf);
		Path path=new Path(file);
		if (!fs.exists(path)) {
			createFile(conf,file,new String(buff,"UTF-8"));
		}else{
			FSDataOutputStream os = null;
			try {
				os = fs.append(path);
				os.write(buff, 0, buff.length);
				os.flush();
				//System.out.println("Create: " + file);
			} finally {
				if (os != null)
					os.close();
				fs.close();
			}
		}
	}
		/**
		 * 
		* @Title: isExist
		* @Description: 判断文件是否存在
		* @param path
		* @param conf
		* @return
		* @throws IOException    设定文件
		* @return boolean    返回类型
		* @throws
		 */
			
	public static boolean isExist(String path,Configuration conf) throws IOException {
		FileSystem fs=null;
		Boolean isexists = null;
		try {
			Path p = new Path(path);
			fs = p.getFileSystem(conf);
			isexists=fs.exists(p);
		}catch (Exception e) {
			e.printStackTrace();
		}finally{
			fs.close();
		}
		
		return isexists;
	}
	
	public static void main(String[] args) throws Exception {
	
	
	}
}

通过打jar 包  放在hadoop平台上 运行 命令:

 hadoop jar /usr/etl.jar  com.xiefg.util.PropertiesUtils

执行结果可以看到   hdfs 生成文件目录 和内容  

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐