数据采集平台(二)
使用flume很方便,架构方面source, channel,sink不是分布式的,没有高可用。但是如果结合kafka channel之后还是不错的。
·
5. 安装Kafka
5.1 基础架构
- 为方便扩展,并提高吞吐量,一个topic分为多个partition
- 配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
- 为提高可用性,为每个partition增加若干副本,类似NameNode HA
5.2 安装步骤
- 解压kafka文件,改为缩略名
- 进入kafka/config文件,修改配置文件:
- myid和brokerID对应
- connect
- kafka日志存放地址
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=0
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://hadoop102:9092
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
- 配置kafka环境变量
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
- 向不存在的主题发送数据时,kafka会自己创建该主题,是单副本和单分区的主题。
- 测试kafka能否正常运行:
- 发送消息:
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
- 消费消息:
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
- 发送消息:
- 编写一键启停脚本
#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
};;
esac
6. 安装flume
6.1 概述
使用flume很方便,架构方面source, channel,sink不是分布式的,没有高可用。但是如果结合kafka channel之后还是不错的。
6.2 安装步骤
- 解压flume安装包并改名
- 修改conf/log4j文件,修改LOG_DIR路径,改为/opt/module/flume/log
- 让日志同时打印在控制台,添加参数
# 引入控制台输出,方便学习查看日志
<Root level="INFO">
<AppenderRef ref="LogFile" />
<AppenderRef ref="Console" />
</Root>
6.3 选择source、channel、sink
官网地址:flume.apache.org, 可以通过官网查找对应的配置参数。
- source: taildir source
- 日志文件地址:filegroups.f1
- 偏移量文件地址:positionFile
- channel:Kafka Channel
- kafka.topic主题名字:默认为flume-channel,建议修改为topic_log
- parseAsFlumeEvent : 修改为false, 不转换成flume事件格式
- sink:kafka sink, 由于kafka channel是channel和sink一体的,实际上不需要配置sink。
#定义组件
a1.sources = r1
a1.channels = c1
#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#组装
a1.sources.r1.channels = c1
- 编写Flume启停脚本
nohup ... &
: 表示进程不会随着窗口的关闭而消失,即后台运行或者挂起了。>/dev/null
将控制台打印信息放到黑洞中,日志可以在flume/log目录下查看即可。2>&1
: 将正确输出流和错误输出流合并ps -ef
: 打印进程状态,ef表示父子关系
#!/bin/bash
case $1 in
"start"){
echo " --------启动 hadoop102 采集flume-------"
ssh hadoop102 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
};;
"stop"){
echo " --------停止 hadoop102 采集flume-------"
ssh hadoop102 "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
};;
esac
7. 电商业务流程
7.1 用户购物流程
- 首页
- 三级分类索引
- 全文检索
- 营销活动
- 商品详情
- 购物车
- SSO单点登录:浏览器token保存
- 下单结算
- 第三方支付
- 售后管理
- 评价
- 退货退款
- 物流服务
- 客户服务
- 库存服务
7.2 电商常识
- sku:商品库存量基本单位,产品统一编号。
- spu: 商品名称聚合最小单位,易于复用、检索。
- UV:人次,不管是否重复
- PV:人数,同一个人只算一次
7.3 电商系统表结构
- 活动相关表,主键为aciivity_id
- 活动信息表
- 活动规则表
- 活动商品关联表
- 平台相关表,就是搜索商品后出现的属性
- 平台属性表,一级索引
- 平台属性值表,二级索引
- 营销坑位表
- 营销渠道表
- 字典表:用于替换解释某些字段
- 分类相关表:
- 一级分类表
- 二级分类表
- 三级分类表
- 用户表
- 用户信息表
- 用户地址表
- 订单相关表
- 订单表
- 订单明细表
- 退单表
- 订单明细活动关联表
- 订单明细优惠券关联表
- 订单状态流水表
- 省份表/地区表
- 品牌表
- 购物车表
- 评价表
- 优惠券相关表
- 优惠券信息表
- 优惠券范围表
- 优惠券领用表
- 支付相关表
- 支付表
- 退款表
MySQL安装
- 下载MySQL安装包
- 去除CentOS系统中自带的mariadb依赖
- 安装libaio
- 切换为root用户,su root
- 修改/etc/my.cnf文件,降低密码级别
- 获取临时密码,进入MySQL命令行
- 设置MySQL密码
- 开放权限, 让外部主机也能访问,mysql默认只能本机访问,并刷新flush privileges
- 修改外界访问密码也为
000000
#!/bin/bash
set -x
[ "$(whoami)" = "root" ] || exit 1
[ "$(ls *.rpm | wc -l)" = "7" ] || exit 1
test -f mysql-community-client-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-client-plugins-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-common-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-icu-data-files-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-libs-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-libs-compat-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-server-8.0.31-1.el7.x86_64.rpm || exit 1
# 卸载MySQL
systemctl stop mysql mysqld 2>/dev/null
rpm -qa | grep -i 'mysql\|mariadb' | xargs -n1 rpm -e --nodeps 2>/dev/null
rm -rf /var/lib/mysql /var/log/mysqld.log /usr/lib64/mysql /etc/my.cnf /usr/my.cnf
set -e
# 安装并启动MySQL
yum install -y *.rpm >/dev/null 2>&1
systemctl start mysqld
#更改密码级别并重启MySQL
sed -i '/\[mysqld\]/avalidate_password.length=4\nvalidate_password.policy=0' /etc/my.cnf
systemctl restart mysqld
# 更改MySQL配置
tpass=$(cat /var/log/mysqld.log | grep "temporary password" | awk '{print $NF}') # NF为awk切分时的列的个数,$NF为最后一列
cat << EOF | mysql -uroot -p"${tpass}" --connect-expired-password >/dev/null 2>&1
set password='000000';
update mysql.user set host='%' where user='root';
alter user 'root'@'%' identified with mysql_native_password by '000000';
flush privileges;
EOF
更多推荐
已为社区贡献3条内容
所有评论(0)