Storm 准备(一)
hadoop解决了分布式存储与计算问题,但是大部分都是进行的离线计算时间周期比较长。企业里急切的希望解决实时的进行大数据分析,如何搭建一个企业的实时数据平台:1.收集数据 flume工具(分级,安全,压缩)2.汇总 flume 消息队列KAFKA(基于硬盘但是速度很快) hadoop集群3.实时处理引擎4.结果存储 mysql mongodb Redis HBase
hadoop解决了分布式存储与计算问题,但是大部分都是进行的离线计算时间周期比较长。企业里急切的希望解决实时的进行大数据分析storm在这样的环境下诞生了(PPT资料)
如何搭建一个企业的实时数据平台:
1.收集数据 flume工具(分级,安全,压缩)
2.汇总 flume 消息队列KAFKA(基于硬盘但是速度很快) hadoop集群
3.实时处理引擎
4.结果存储 mysql mongodb Redis HBase
5.应用
流式处理,KAFKA消息队列来从硬盘级来解决大数据的实时处理,基于硬盘的为啥数据处理还是那么快:分布式的消息队列partition分区是队列形式从队列头部取数据打打减少了寻址过程。
kafka在storm中用来收集汇总数据:zookeeper在kafak中的作用如下图:(协调kafka)
kafka基本组建:
tipic话题:是指特定类型的消息流。
producer 生产者:能够发送消息到话题的任何对象
Broker代理:或者Kafka集群,保存已发布的消息到一组服务器中
consumer消费者:可以订阅多个话题,并从Broker拉取数据,从而消费这些已发送的消息。
kafka的特点:
在kafka中,一个partition中的消息只会背group中的一个consumer消费;每个group中的consumer消息消费相互独立;我们可以认为一个group是一个订阅者,一个tpic中的每个partions,只会被一个订阅者中的consumer消费,不过一个consumer可以消费多个partitions中的消息。kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。从topic角度来说,消息仍然不是有序的。
部署zookeeper的步骤:
第一步解压zookeeper配置conf/zoo.cfg文件
对应代码修改成如下:
# set server
server.1=192.168.145.129:4887:5887
#server.2=cluster-node-02:4887:5887
#server.3=cluster-node-03:4887:5887
#server.4=cluster-node-04:4887:5887
第二步:启动服务,用自带的客户端连接确认是否安装成功
bin/zkServer.sh start
zk客户端:bin/zkCli.sh[root@localhost zookeeper-3.4.6]# vim conf/zoo.cfg
[root@localhost zookeeper-3.4.6]# bin/zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
修改conf/zoo.cfg配置文件中的dataDir、dataLogDir、server.1
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/home/shaka/dep/zookeeper-3.3.6/data (改成自己的目录)
# the port at which the clients will connect
clientPort=2181
# set logs
dataLogDir=/home/shaka/dep/zookeeper-3.3.6/logs (改成自己的目录)
# set server
server.1=hostname:4887:5887 (改成自己的hostname或IP)
#server.2=10.162.219.52:4887:5887
#server.3=10.163.15.119:4887:5887
# add by shaka
# set max client connects
maxClientCnxns=300
[root@localhost zookeeper-3.4.6]# bin/zkCli.sh
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[consumers, config, admin, brokers, zookeeper, controller_epoch]
命令行工具的一些简单操作如下:
1. 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容
2. 创建文件,并设置初始内容: create /zk "test" 创建一个新的 znode节点“ zk ”以及与它关联的字符串
3. 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串
4. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置
5. 删除文件: delete /zk 将刚才创建的 znode 删除
6. 退出客户端: quit
7. 帮助命令: help
第三步:可选 安装zk的ui软件 zkui
首先安装编译工具mvn
配置环境变量:
export MVN_HOME=/usr/local/apache-maven-3.2.1
PATH=$PATH:$HOME/bin:$MVN_HOME/bin
#set java environment
export PATH
到解压的zkui目录下执行:(需要联网下载jar包)
mvn clean install
在zkui下面建立连接文件:
ln -s target/zkui-2.0......jar zkui-2.0....jar
启动zkui
vim config.cfg
zkServer=192.168.145.129:2181
启动:nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &
查看:http://123.56.76.213:9090 或者 jps -l
安装部署Kafka:
step1:拷贝软件包
step2:解压
tar -xvf kafka_2.9.2-0.8.1.1.tgz 或者用 unzip kafka_2.9.2-0.8.1.1.zip
,修改配置文件conf/server.properties:
broker.id=0;
host.name=192.169.145.129;
zookeeper.connect=192.168.145.129:2181可逗号分隔配置多个
step3: 确保有执行权限
chmod +x sbin/*
step 修改配置文件vim log4j.properties
log4j.appender.D.File = /data1/home/shaka/kafka/kafka_2.9.2-0.8.1.1/logs/debug.log
log4j.appender.E.File = /data1/home/shaka/kafka/kafka_2.9.2-0.8.1.1/logs/error.log
Step5:启动kafka服务
sbin/start-kafka.sh
查看是否启动: jsp -l
step6.创建topic主题:
bin/kafka-create-topic.sh --zookeeper 192.168.145.129:2181 --replica 1 --partition 1 --topic mykafka
测试例子:
创建topic
bin/kafka-topics.sh --create --zookeeper 192.168.145.129:2181 --replication-factor 1 --partitions 2 --topic topic-ydcun-name (IP不能是localhost)</span>
启动consumer
bin/kafka-console-consumer.sh --zookeeper 192.168.145.129:2181 --topic topic-ydcun
(IP不能localhost)
启动productor
bin/kafka-console-producer.sh --broker-list 192.168.145.129:9092 --topic topic-ydcun
在productor端输入,看consumer端的输出。
用shell程序类来模拟一个生产者
修改logger:
启动 :nohup sh shelllogger.sh >> shelllogger.log 2>&1 &
查看启动: ps -axu|grep shell
kill :
查看: tail -f access.log
shelllogger.sh
#!/bin/sh
# start cmd:
# nohup sh shellcrawler.sh >> shellcrawler.log 2>&1 &
# set timer
g_getTime=""
function getTime
{
g_getTime=`date '+%Y%m%d %H:%M:%S'`
}
#getTime && echo "[$g_getTime] [$0:$LINENO:$FUNCNAME] - "
# set function
function crawler
{
int=1
while(( $int<=1000000000 ))
do
log="insert into test(name,content) values('ydcun','hello word');"
let "int++"
echo $log >> access.log
sleep 1s
#usleep 1000
done
}
# main
crawler
启动looger
nohup tail -f ../logger/access.log | bin/kafka-console-producer.sh --broker-list 192.168.145.129:9092 --topic topic-ydcun > logs/producer.log 2>&1 &
更多推荐
所有评论(0)