购物平台商品实时推荐系统(四)
一、flume + kafka进行日志收集----------------------------------------------------------1.在集群上每台nginx服务器安装flume.2.配置flume,并分发使用spooldir做为source,监控/soft/nginx/logs/flume文件夹。[/soft...
·
一、flume + kafka进行日志收集
----------------------------------------------------------
1.在集群上每台nginx服务器安装flume.
2.配置flume,并分发
使用spooldir做为source,监控/soft/nginx/logs/flume文件夹。
[/soft/flume/conf/eshop.conf]
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /soft/nginx/logs/flume
a1.sources.r1.fileHeader = true
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = eshop
a1.sinks.k1.kafka.bootstrap.servers = s200:9092 s300:9092 s400:9092
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.启动kafka集群
a)启动zk集群
$>zkServer.sh start //s100
$>zkServer.sh start //s200
$>zkServer.sh start //s300
b)启动kafka集群
//s200
$>kafka-server-start.sh /soft/kafka/config/server.properties
//s300
$>kafka-server-start.sh /soft/kafka/config/server.properties
//s400
$>kafka-server-start.sh /soft/kafka/config/server.properties
c)创建eshop主题
$> kafka-topics.sh --zookeeper s100:2181 --topic eshop --create --partitions 3 --replication-factor 3
d)查看主题
$> kafka-topics.sh --zookeeper s100:2181 --list
e)启动flume
//s201
$>cd /soft/flume/conf
$>flume-ng agent -f /soft/flume/conf/eshop.conf -n a1
//s202
//s203
//s204
f)启动hdfs集群
$>start-dfs.sh
二、创建kafka消费者模块EshopConsumer -- 将日志数据原样输出到HDFS上作为保存
------------------------------------------------------------------------------
1.添加maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>EshopConsumer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
</dependencies>
</project>
2.创建包com.test.kafkaconsumer
3.创建HDFS输出连接池HDFSOutputStreamPool.java
package com.test.kafkaconsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.util.HashMap;
import java.util.Map;
/**
* 输出流池
*/
public class HDFSOutputStreamPool {
private FileSystem fs;
//存放的所有的输出流
private Map<String, FSDataOutputStream> pool = new HashMap<String, FSDataOutputStream>();
private static HDFSOutputStreamPool instance;
private HDFSOutputStreamPool() {
try {
Configuration conf = new Configuration();
fs = FileSystem.get(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
public static HDFSOutputStreamPool getInstance() {
if (instance == null) {
instance = new HDFSOutputStreamPool();
}
return instance;
}
/**
* 通过路径得到对应的输出流
*/
public FSDataOutputStream getOutputStream(String path) {
try{
FSDataOutputStream out = pool.get(path);
if(out == null){
Path p = new Path(path);
if(!fs.exists(p)){
out = fs.create(p);
pool.put(path,out);
}
else{
out = fs.append(p);
pool.put(path,out);
}
}
return out ;
}
catch(Exception e){
e.printStackTrace();
}
return null ;
}
}
4.创建HDFS写入器HDFSWriter.java
package com.test.kafkaconsumer;
import org.apache.hadoop.fs.FSDataOutputStream;
/**
* HDFS的写入类
*/
public class HDFSWriter {
/**
* 写入log到hdfs文件
* hdfs://mycluster/eshop/2017/02/28/s201.log | s202.log | s203.log
*/
public void writeLog2HDFS(String path, byte[] log) {
try {
FSDataOutputStream out = HDFSOutputStreamPool.getInstance().getOutputStream(path);
out.write(log);
out.write("\r\n".getBytes());
out.hsync();
out.hflush();
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.创建字符串解析工具类StringUtil.java
package com.test.kafkaconsumer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
/**
* 字符串工具类
*/
public class StringUtil {
/**
* |是正则表示特殊字符
*/
private static final String token = "\\|\\|\\|" ;
/**
* 切割单行日志
*/
public static String[] splitLog(String log){
String[] arr = log.split(token);
return arr ;
}
public static String getHostname(String[] arr){
return arr[0];
}
/**
*返回 2017/02/28/12/12
*/
public static String formatYyyyMmDdHhMi(String[] arr){
try {
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
Date d = sdf.parse(arr[3].split(" ")[0]);
SimpleDateFormat localSDF = new SimpleDateFormat("yyyy/MM/dd/HH/mm", Locale.US);
return localSDF.format(d);
}
catch(Exception e){
e.printStackTrace();
}
return null ;
}
}
6.创建kafka消费者类
package com.test.kafkaconsumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* HDFS原生数据消费者 --- 直接拷贝数据到HDFS
*/
public class HDFSRawConsumer {
private final ConsumerConnector consumerConn;
private final String topic = "eshop";
private static HDFSWriter writer = new HDFSWriter();
public HDFSRawConsumer() {
Properties props = new Properties();
props.put("zookeeper.connect", "s100:2181");
props.put("group.id", "gg1");
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
// 创建消费者连接器
consumerConn = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
}
/**
* 处理log
*/
public void processLog() {
// 指定消费的主题
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, new Integer(1));
// 消费的消息流
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConn.createMessageStreams(topicCount);
// 的到指定主题的消息列表
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
//
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
//迭代日志消息
while (consumerIte.hasNext()) {
byte[] msg = consumerIte.next().message();
String log = new String(msg) ;
String[] arr = StringUtil.splitLog(log);
if(arr == null || arr.length < 10){
continue ;
}
//主机名
String hostname = StringUtil.getHostname(arr);
//日期串
String dateStr = StringUtil.formatYyyyMmDdHhMi(arr);
//path
String rawPath = "/data/eshop/raw/" + dateStr + "/" + hostname + ".log";
//写入数据到hdfs
System.out.println(log);
writer.writeLog2HDFS(rawPath, msg);
}
}
}
public static void main(String[] args) {
HDFSRawConsumer consumer = new HDFSRawConsumer();
consumer.processLog();
}
}
7.进行测试,查看数据是否成功写入到HDFS上
三、连接池的优化 + 输出流引入装饰模式
-------------------------------------------------------
1.对HDFSOutputStream进行装饰MyFSDataOutputStream.java
package com.test.kafkaconsumer;
import org.apache.hadoop.fs.FSDataOutputStream;
import java.io.IOException;
/**
*装饰流
*/
public class MyFSDataOutputStream extends FSDataOutputStream{
private String path ;
private FSDataOutputStream out ;
private HDFSOutputStreamPool pool ;
public MyFSDataOutputStream(String path , FSDataOutputStream out, HDFSOutputStreamPool pool) throws IOException{
super(null);
this.out = out ;
this.pool = pool ;
}
public void close(){
try{
out.close();
}
catch(Exception e){
e.printStackTrace();
}
}
public void hflush() throws IOException {
out.hflush();
}
public void write(byte[] b) throws IOException {
out.write(b);
}
public void hsync() throws IOException {
out.hsync();
}
/**
* 回收
*/
public void release(){
pool.putBack(path, this);
}
}
2.完善输出流池代码HDFSOutputStreamPool.java
package com.test.kafkaconsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.util.HashMap;
import java.util.Map;
/**
* 输出流池
* 不能写一个小文件就开关一个输出流。太耗性能。所以引入池化模式
*/
public class HDFSOutputStreamPool {
private FileSystem fs;
//存放的所有的输出流
private Map<String, FSDataOutputStream> pool = new HashMap<String, FSDataOutputStream>();
private static HDFSOutputStreamPool instance;
private HDFSOutputStreamPool() {
try {
Configuration conf = new Configuration();
fs = FileSystem.get(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
public static HDFSOutputStreamPool getInstance() {
if (instance == null) {
instance = new HDFSOutputStreamPool();
}
return instance;
}
/**
* 通过路径得到对应的输出流
*/
public synchronized FSDataOutputStream takeOutputStream(String path) {
try{
FSDataOutputStream out = pool.remove(path);
if(out == null){
Path p = new Path(path);
if(!fs.exists(p)){
fs.createNewFile(p);
}
out = fs.append(p);
}
//转换成自己的装饰流
return new MyFSDataOutputStream(path,out,this) ;
}
catch(Exception e){
e.printStackTrace();
}
return null ;
}
/**
* 回收流
*/
public synchronized void putBack(String path,FSDataOutputStream out){
pool.put(path,out) ;
}
/**
* 释放池子
*/
public synchronized void releasePool(){
try{
for(FSDataOutputStream o : pool.values()){
o.close();
}
pool.clear();
System.out.println("池子释放了!!!");
}
catch(Exception e){
e.printStackTrace();
}
}
}
3.编写定期清理池子的类CloseFSOuputStreamTask.java
package com.test.kafkaconsumer;
import java.util.TimerTask;
/**
* 关闭线程池
*/
public class CloseFSOuputStreamTask extends TimerTask{
public void run() {
HDFSOutputStreamPool pool = HDFSOutputStreamPool.getInstance();
pool.releasePool();
}
}
4.完善HDFS消费者HDFSRawConsumer.java
package com.test.kafkaconsumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* HDFS原生数据消费者 --- 直接拷贝数据到HDFS
*/
public class HDFSRawConsumer {
private final ConsumerConnector consumerConn;
private final String topic = "eshop";
public HDFSRawConsumer() {
Properties props = new Properties();
props.put("zookeeper.connect", "s100:2181");
props.put("group.id", "gr1");
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
// 创建消费者连接器
consumerConn = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
}
/**
* 处理log
*/
public void processLog() {
// 指定消费的主题
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, new Integer(1));
// 消费的消息流
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConn.createMessageStreams(topicCount);
// 的到指定主题的消息列表
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
//
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
//迭代日志消息
MyFSDataOutputStream out = null ;
String prePath = "";
while (consumerIte.hasNext()) {
byte[] msg = consumerIte.next().message();
String log = new String(msg) ;
String[] arr = StringUtil.splitLog(log);
if(arr == null || arr.length < 10){
continue ;
}
//
System.out.println("raw:" + log);
//主机名
String hostname = StringUtil.getHostname(arr);
//日期串
String dateStr = StringUtil.formatYyyyMmDdHhMi(arr);
//path
String rawPath = "/user/centos/eshop/raw/" + dateStr + "/" + hostname + ".log";
try{
//判断是否和上一次相同
if(!rawPath.equals(prePath)){
if(out != null){
out.release();
out = null ;
}
out = (MyFSDataOutputStream)HDFSOutputStreamPool.getInstance().takeOutputStream(rawPath);
prePath = rawPath ; //
}
//
out.write(msg);
out.write("\r\n".getBytes());
out.hsync();
}
catch(Exception e){
e.printStackTrace();
}
}
}
}
}
5.新建App管理ConsumerApp.java
package com.test.kafkaconsumer;
import java.util.Timer;
/**
*
*/
public class ConsumerApp {
public static void main(String[] args) {
//开启定时器任务,周期性关闭流
new Timer().schedule(new CloseFSOuputStreamTask(), 0, 30000);
//hdfs消费者
new Thread(){
public void run() {
HDFSRawConsumer consumer = new HDFSRawConsumer();
consumer.processLog();
}
}.start();
}
}
6.删除HDFS写入器类HDFSWriter.java,因为已经没用了
四、HIVE数据清洗和写入
-------------------------------------------------------------------
1.创建数据库.
$>hive
$hive> create database eshop ;
2.创建hive的分区表
create external table eshop.logs (
hostname string,
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string
)
partitioned by(year int ,month int,day int,hour int,minute int)
row format DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
3.创建centos的cron作业,周期添加表分区。
a.使用的添加分区语句
hive> alter table eshop.logs add partition(year=2017,month=03,day=01,hour=12,minute=12)
b.mysql查看hive表分区
$mysql> select * from hive.partitions ;
4.创建脚本,添加表分区脚本。
[/usr/local/bin/addpar.sh]
#!/bin/bash
y=`date +%Y`
m=`date +%m`
d=`date +%d`
h=`date +%H`
mi1=`date +%M`
mi2=`date -d "1 minute" +%M`
mi3=`date -d "2 minute" +%M`
hive -e "alter table eshop.logs add if not exists partition(year=${y},month=${m},day=${d},hour=${h},minute=${mi1})"
hive -e "alter table eshop.logs add if not exists partition(year=${y},month=${m},day=${d},hour=${h},minute=${mi2})"
hive -e "alter table eshop.logs add if not exists partition(year=${y},month=${m},day=${d},hour=${h},minute=${mi3})"
hdfs dfs -chmod -R 777 /user/hive/warehouse
6.添加调度任务/etc/crontab
SHELL=/bin/bash
PATH=/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/bin:/soft/jdk/bin:/soft/hadoop/sbin:/soft/hadoop/bin:/soft/hive/bin
MAILTO=root
# For details see man 4 crontabs
# Example of job definition:
# .---------------- minute (0 - 59)
# | .------------- hour (0 - 23)
# | | .---------- day of month (1 - 31)
# | | | .------- month (1 - 12) OR jan,feb,mar,apr ...
# | | | | .---- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat
# | | | | |
# * * * * * user-name command to be executed
* * * * * centos addpar.sh
7.测试load数据到hive表
$>hive
//LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
$hive>load data inpath 'hdfs://mycluster/user/centos/eshop/raw/2017/02/28/17/35/s203.log' into table eshop.logs partition(year=2017,month=3,day=1,hour=11,minute=31)
8.完善工具类StringUtil.java
package com.test.kafkaconsumer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
/**
* 字符串工具类
*/
public class StringUtil {
/**
* |是正则表示特殊字符
*/
private static final String token = "\\|\\|\\|" ;
/**
* 切割单行日志
*/
public static String[] splitLog(String log){
String[] arr = log.split(token);
return arr ;
}
public static String getHostname(String[] arr){
return arr[0];
}
/**
*返回 2017/02/28/12/12
*/
public static String formatYyyyMmDdHhMi(String[] arr){
try {
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
Date d = sdf.parse(arr[3].split(" ")[0]);
SimpleDateFormat localSDF = new SimpleDateFormat("yyyy/MM/dd/HH/mm", Locale.US);
return localSDF.format(d);
}
catch(Exception e){
e.printStackTrace();
}
return null ;
}
/**
* 将数组转换成字符串,使用token作为分隔符
*/
public static String arr2Str(Object[] arr,String token){
String str = "" ;
for(Object o : arr){
str = str + o + token ;
}
return str.substring(0,str.length() - 1) ;
}
/**
* 将字符串转成日期对象
*/
public static Date str2Date(String[] arr){
try {
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
Date d = sdf.parse(arr[3].split(" ")[0]);
return d ;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
9.创建清洗数据消费者
package com.test.kafkaconsumer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.*;
/**
* Hive数据清洗消费者
*/
public class HiveCleanedConsumer {
private final ConsumerConnector consumerConn;
private final String topic = "eshop";
public HiveCleanedConsumer() {
Properties props = new Properties();
props.put("zookeeper.connect", "s100:2181");
props.put("group.id", "gr2");
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
// 创建消费者连接器
consumerConn = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
}
/**
* 处理log
*/
public void processLog() {
// 指定消费的主题
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, new Integer(1));
// 消费的消息流
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConn.createMessageStreams(topicCount);
// 的到指定主题的消息列表
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
//
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
//迭代日志消息
MyFSDataOutputStream out = null ;
String prePath = "";
while (consumerIte.hasNext()) {
byte[] msg = consumerIte.next().message();
String newMsg = null ;
String log = new String(msg) ;
String[] arr = StringUtil.splitLog(log);
//
if(arr == null || arr.length < 10){
continue ;
}
System.out.println("hive : " + log);
//进行清洗
String request = arr[4];
String[] reqArr = request.split(" ") ;
if(reqArr != null && reqArr.length == 3){
if(reqArr[1].endsWith(".html")){
newMsg = StringUtil.arr2Str(arr,",");
}
else{
continue ;
}
}
else{
continue ;
}
//主机名
String hostname = StringUtil.getHostname(arr);
//
//取出日期对象
Date reqDate = StringUtil.str2Date(arr);
//得到日历对象
Calendar c = Calendar.getInstance();
//设置Date时间
c.setTime(reqDate);
int y = c.get(Calendar.YEAR);
int m = c.get(Calendar.MONTH) + 1;
int d = c.get(Calendar.DAY_OF_MONTH);
int h = c.get(Calendar.HOUR_OF_DAY);
int mi = c.get(Calendar.MINUTE);
//path
String rawPath = "/user/hive/warehouse/eshop.db/logs/year="+y
+ "/month=" + m
+ "/day=" + d
+ "/hour=" + h
+ "/minute=" + mi
+ "/" + hostname+ ".log";
try{
//判断是否和上一次相同
if(!rawPath.equals(prePath)){
if(out != null){
out.release();
out = null ;
}
out = (MyFSDataOutputStream)HDFSOutputStreamPool.getInstance().takeOutputStream(rawPath);
prePath = rawPath ; //
}
//
out.write(newMsg.getBytes());
out.write("\r\n".getBytes());
out.hsync();
}
catch(Exception e){
e.printStackTrace();
}
}
}
}
}
10.编写消费者的App入口程序,注意需要使用多线程将每个消费者都分配到各自线程中。
package com.it18zhang.kafkconsumer;
import java.util.Timer;
/**
*
*/
public class ConsumerApp {
public static void main(String[] args) {
//开启定时器任务,周期性关闭流
new Timer().schedule(new CloseFSOuputStreamTask(), 0, 30000);
//hdfs消费者
new Thread(){
public void run() {
HDFSRawConsumer consumer = new HDFSRawConsumer();
consumer.processLog();
}
}.start();
//hive清洗消费者
new Thread(){
public void run() {
HiveCleanedConsumer consumer = new HiveCleanedConsumer();
consumer.processLog();
}
}.start();
}
}
五、打成jar包,扔到集群上测试
更多推荐
已为社区贡献1条内容
所有评论(0)