hadoop解决了分布式存储与计算问题,但是大部分都是进行的离线计算时间周期比较长。企业里急切的希望解决实时的进行大数据分析storm在这样的环境下诞生了(PPT资料

如何搭建一个企业的实时数据平台:

1.收集数据  flume工具(分级,安全,压缩)
2.汇总 flume    消息队列KAFKA(基于硬盘但是速度很快) hadoop集群
3.实时处理引擎
4.结果存储 mysql  mongodb  Redis HBase
5.应用

流式处理,KAFKA消息队列来从硬盘级来解决大数据的实时处理,基于硬盘的为啥数据处理还是那么快:分布式的消息队列partition分区是队列形式从队列头部取数据打打减少了寻址过程。


kafka在storm中用来收集汇总数据:zookeeper在kafak中的作用如下图:(协调kafka)



kafka基本组建:

tipic话题:是指特定类型的消息流。

producer 生产者:能够发送消息到话题的任何对象

Broker代理:或者Kafka集群,保存已发布的消息到一组服务器中

consumer消费者:可以订阅多个话题,并从Broker拉取数据,从而消费这些已发送的消息。


kafka的特点:

1. 消息保存在磁盘, O(1) 时间复杂度
2. 不使用内存?使用磁盘缓存
3. 消费状态保存在消费客户端
4. 可以保存足够大的未处理消息

在kafka中,一个partition中的消息只会背group中的一个consumer消费;每个group中的consumer消息消费相互独立;我们可以认为一个group是一个订阅者,一个tpic中的每个partions,只会被一个订阅者中的consumer消费,不过一个consumer可以消费多个partitions中的消息。kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。从topic角度来说,消息仍然不是有序的。



部署zookeeper的步骤:

第一步解压zookeeper配置conf/zoo.cfg文件

对应代码修改成如下:
# set server
server.1=192.168.145.129:4887:5887
#server.2=cluster-node-02:4887:5887
#server.3=cluster-node-03:4887:5887
#server.4=cluster-node-04:4887:5887


第二步:启动服务,用自带的客户端连接确认是否安装成功

bin/zkServer.sh start 
zk客户端:bin/zkCli.sh[root@localhost zookeeper-3.4.6]# vim conf/zoo.cfg 
[root@localhost zookeeper-3.4.6]# bin/zkServer.sh start   
JMX enabled by default
Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
修改conf/zoo.cfg配置文件中的dataDir、dataLogDir、server.1
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/home/shaka/dep/zookeeper-3.3.6/data  (改成自己的目录)
# the port at which the clients will connect
clientPort=2181
# set logs
dataLogDir=/home/shaka/dep/zookeeper-3.3.6/logs  (改成自己的目录)
# set server
server.1=hostname:4887:5887   (改成自己的hostname或IP)
#server.2=10.162.219.52:4887:5887
#server.3=10.163.15.119:4887:5887
# add by shaka
# set max client connects
maxClientCnxns=300
[root@localhost zookeeper-3.4.6]# bin/zkCli.sh
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[consumers, config, admin, brokers, zookeeper, controller_epoch]
命令行工具的一些简单操作如下:
    1. 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容
    2. 创建文件,并设置初始内容: create /zk "test" 创建一个新的 znode节点“ zk ”以及与它关联的字符串
    3. 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串
    4. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置
    5. 删除文件: delete /zk 将刚才创建的 znode 删除
    6. 退出客户端: quit
    7. 帮助命令: help

第三步:可选 安装zk的ui软件 zkui

首先安装编译工具mvn  
配置环境变量:
export MVN_HOME=/usr/local/apache-maven-3.2.1
PATH=$PATH:$HOME/bin:$MVN_HOME/bin
#set java environment
export PATH

到解压的zkui目录下执行:(需要联网下载jar包)
mvn clean install
在zkui下面建立连接文件:  
ln -s target/zkui-2.0......jar zkui-2.0....jar

启动zkui
vim config.cfg 
zkServer=192.168.145.129:2181

启动:nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &
查看:http://123.56.76.213:9090  或者    jps -l


安装部署Kafka:


step1:拷贝软件包

step2:解压
     tar -xvf kafka_2.9.2-0.8.1.1.tgz    或者用 unzip  kafka_2.9.2-0.8.1.1.zip
,修改配置文件conf/server.properties:
broker.id=0;
host.name=192.169.145.129;
zookeeper.connect=192.168.145.129:2181可逗号分隔配置多个

step3: 确保有执行权限
chmod +x sbin/* 
step 修改配置文件vim log4j.properties
log4j.appender.D.File = /data1/home/shaka/kafka/kafka_2.9.2-0.8.1.1/logs/debug.log
log4j.appender.E.File = /data1/home/shaka/kafka/kafka_2.9.2-0.8.1.1/logs/error.log


Step5:启动kafka服务
sbin/start-kafka.sh 
查看是否启动: jsp -l
step6.创建topic主题:
bin/kafka-create-topic.sh --zookeeper 192.168.145.129:2181 --replica 1 --partition 1 --topic mykafka

测试例子:
创建topic
bin/kafka-topics.sh --create --zookeeper 192.168.145.129:2181 --replication-factor 1 --partitions 2 --topic topic-ydcun-name  (IP不能是localhost)</span>
启动consumer
bin/kafka-console-consumer.sh --zookeeper 192.168.145.129:2181 --topic topic-ydcun
(IP不能localhost)
启动productor
bin/kafka-console-producer.sh --broker-list 192.168.145.129:9092 --topic topic-ydcun
在productor端输入,看consumer端的输出。

用shell程序类来模拟一个生产者

修改logger:
启动 :nohup sh shelllogger.sh >> shelllogger.log 2>&1 &
查看启动: ps -axu|grep shell
kill :
查看: tail -f access.log

shelllogger.sh

#!/bin/sh

# start cmd:
# nohup sh shellcrawler.sh  >> shellcrawler.log 2>&1 &
# set timer
g_getTime=""
function getTime
{
        g_getTime=`date  '+%Y%m%d %H:%M:%S'`
}
#getTime && echo "[$g_getTime] [$0:$LINENO:$FUNCNAME] - "

# set function
function crawler
{
	int=1
	while(( $int<=1000000000 ))
	do
	log="insert into test(name,content) values('ydcun','hello word');"
	let "int++"
        echo $log >> access.log 
        sleep 1s
        #usleep 1000
	done
}


# main
 
 crawler


启动looger

 nohup tail -f ../logger/access.log | bin/kafka-console-producer.sh --broker-list 192.168.145.129:9092 --topic topic-ydcun > logs/producer.log 2>&1 &




Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐