AOP切面编程& 埋点 将用户行为存储到数据库或者发送到kafka进行spark处理
一、基于注解的Spring AOP的配置和使用<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:conte...
一、基于注解的Spring AOP的配置和使用
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<context:annotation-config/>
<!--1、自动扫描所有的组件 aop的注解@Befor,@After -->
<context:component-scan base-package="com.aop"></context:component-scan>
<!--2、开启基于注解的aop功能 -->
<aop:aspectj-autoproxy></aop:aspectj-autoproxy>
</beans>
package cn.aop.service;
public class UserService {
private final static Log log = LogFactory.getLog(UserService.class);
public User get(long id){
if(log.isInfoEnabled()){
log.info("getUser method . . .");
}
return new User();
}
…………
}
package cn.aop.aspect;
/**
* 系统服务组件Aspect切面Bean
*/
//声明这是一个组件
@Component
//声明这是一个切面Bean
@Aspect
public class ServiceAspect {
private final static Log log = LogFactory.getLog(ServiceAspect.class);
public static final String EDP = ""execution(* cn.aop.service..*(..))"";
//配置切入点,该方法无方法体,主要为方便同类中其他方法使用此处配置的切入点
@Pointcut("execution(* cn.aop.service..*(..))")
public void aspect(){ }
/*
* 配置前置通知,使用在方法aspect()上注册的切入点
* 同时接受JoinPoint切入点对象,可以没有该参数
*/
@Before("aspect()")
public void before(JoinPoint joinPoint){
if(log.isInfoEnabled()){
log.info("before " + joinPoint);
}
}
@After("execution(* com.service.impl.UserService.*(..))")
public void afterEat() {
System.out.println("----------afterEat------后置增强------------------");
}
@After(value = EDP) //也可以用这种方式
public void after(JoinPoint joinPoint){
if(log.isInfoEnabled()){
log.info("after " + joinPoint);
System.out.println("访问埋点菜单后,往数据库插入一条记录");
}
}
//配置环绕通知,使用在方法aspect()上注册的切入点
@Around("aspect()")
public void around(JoinPoint joinPoint){
long start = System.currentTimeMillis();
try {
((ProceedingJoinPoint) joinPoint).proceed();
long end = System.currentTimeMillis();
if(log.isInfoEnabled()){
log.info("around " + joinPoint + "\tUse time : " + (end - start) + " ms!");
}
} catch (Throwable e) {
long end = System.currentTimeMillis();
if(log.isInfoEnabled()){
log.info("around " + joinPoint + "\tUse time : " + (end - start) + " ms with exception : " + e.getMessage());
}
}
}
//配置后置返回通知,使用在方法aspect()上注册的切入点
@AfterReturning("aspect()")
public void afterReturn(JoinPoint joinPoint){
if(log.isInfoEnabled()){
log.info("afterReturn " + joinPoint);
}
}
//配置抛出异常后通知,使用在方法aspect()上注册的切入点
@AfterThrowing(pointcut="aspect()", throwing="ex")
public void afterThrow(JoinPoint joinPoint, Exception ex){
if(log.isInfoEnabled()){
log.info("afterThrow " + joinPoint + "\t" + ex.getMessage());
}
}
}
测试代码:
package cn.ysh.studio.spring.aop;
/**
* Spring AOP测试
* @author Shenghany
* @date 2013-5-28
*/
public class Tester {
private final static Log log = LogFactory.getLog(Tester.class);
public static void main(String[] args) {
//启动Spring容器
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
//获取service组件
UserService service = (UserService) context.getBean("userService");
//以普通的方式调用UserService对象的三个方法
User user = service.get(1L);
service.save(user);
try {
service.delete(1L);
} catch (Exception e) {
if(log.isWarnEnabled()){
log.warn("Delete user : " + e.getMessage());
}
}
}
}
控制台输出如下:
INFO [spring.aop.aspect.ServiceAspect:40] before execution(User cn.ysh.studio.spring.aop.service.UserService.get(long))
INFO [spring.aop.service.UserService:19] getUser method . . .
INFO [spring.aop.aspect.ServiceAspect:60] around execution(User cn.ysh.studio.spring.aop.service.UserService.get(long)) Use time : 42 ms!
INFO [spring.aop.aspect.ServiceAspect:48] after execution(User cn.ysh.studio.spring.aop.service.UserService.get(long))
INFO [spring.aop.aspect.ServiceAspect:74] afterReturn execution(User cn.ysh.studio.spring.aop.service.UserService.get(long))
INFO [spring.aop.aspect.ServiceAspect:40] before execution(void cn.ysh.studio.spring.aop.service.UserService.save(User))
INFO [spring.aop.service.UserService:26] saveUser method . . .
INFO [spring.aop.aspect.ServiceAspect:60] around execution(void cn.ysh.studio.spring.aop.service.UserService.save(User)) Use time : 2 ms!
INFO [spring.aop.aspect.ServiceAspect:48] after execution(void cn.ysh.studio.spring.aop.service.UserService.save(User))
INFO [spring.aop.aspect.ServiceAspect:74] afterReturn execution(void cn.ysh.studio.spring.aop.service.UserService.save(User))
INFO [spring.aop.aspect.ServiceAspect:40] before execution(boolean cn.ysh.studio.spring.aop.service.UserService.delete(long))
INFO [spring.aop.service.UserService:32] delete method . . .
INFO [spring.aop.aspect.ServiceAspect:65] around execution(boolean cn.ysh.studio.spring.aop.service.UserService.delete(long)) Use time : 5 ms with exception : spring aop ThrowAdvice演示
INFO [spring.aop.aspect.ServiceAspect:48] after execution(boolean cn.ysh.studio.spring.aop.service.UserService.delete(long))
INFO [spring.aop.aspect.ServiceAspect:74] afterReturn execution(boolean cn.ysh.studio.spring.aop.service.UserService.delete(long))
WARN [studio.spring.aop.Tester:32] Delete user : Null return value from advice does not match primitive return type for: public boolean cn.ysh.studio.spring.aop.service.UserService.delete(long) throws java.lang.Exception
二、基于用配置文件的方式的Spring AOP的配置和使用
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<bean id="loggingAspect" class="com.ssm.aop.LoggingAspect"/>
<!-- 配置AOP -->
<aop:config>
<!-- 配置切点表达式 -->
<aop:pointcut expression="execution(* com.ssm.service.*.*(..))" id="pointcut"/>
<!-- 配置切面和通知 -->
<aop:aspect ref="loggingAspect" order="2">
<aop:before method="beforeMethod" pointcut-ref="pointcut"/>
<aop:after method="afterMethod" pointcut-ref="pointcut"/>
<aop:after-throwing method="afterThrowingMethod" pointcut-ref="pointcut" throwing="ex"/>
<aop:after-returning method="afterReturningMethod" pointcut-ref="pointcut" returning="result"/>
</aop:aspect>
</beans>
package com.ssm.aop;
public class LoggingAspect {
// 声明该方法为一个前置通知:在目标方法开始之前执行
public void beforeMethod(JoinPoint joinpoint) {
String methodName = joinpoint.getSignature().getName();
List<Object> args = Arrays.asList(joinpoint.getArgs());
System.out.println("before method " + methodName + " with " + args);
}
// 声明该方法为一个后置通知:在目标方法结束之后执行(无论方法是否抛出异常)。
// 但是因为当方法抛出异常时,不能返回值为null,因此这里无法获取到异常值。
public void afterMethod(JoinPoint joinpoint) {
String methodName = joinpoint.getSignature().getName();
List<Object> args = Arrays.asList(joinpoint.getArgs());
System.out.println("after method " + methodName);
}
/**
* 声明该方法为一个返回通知:在目标方法返回结果时后执行(当方法抛出异常时,无法执行)
*/
public void afterReturningMethod(JoinPoint joinpoint, Object result) {
String methodName = joinpoint.getSignature().getName();
List<Object> args = Arrays.asList(joinpoint.getArgs());
System.out.println(
"after method " + methodName + " with returning " + (result == null ? "NULL" : result.toString()));
}
/**
* 定义一个异常通知函数: 只有在方法跑吹异常时,该方法才会执行,而且可以接受异常对象。
*/
public void afterThrowingMethod(JoinPoint joinpoint, Exception ex) {
String methodName = joinpoint.getSignature().getName();
List<Object> args = Arrays.asList(joinpoint.getArgs());
System.out.println(
"after method " + methodName + " occurs exception: " + (ex == null ? "NULL" : ex.getMessage()));
}
public Object aroundMethod(ProceedingJoinPoint pJoinPoint) throws Exception {
String methodName = pJoinPoint.getSignature().getName();
List<Object> args = Arrays.asList(pJoinPoint.getArgs());
Object result = null;
try {
// 前置通知
System.out.println("before method " + methodName + " with " + args);
// 执行目标方法
result = pJoinPoint.proceed();
// 返回通知
System.out.println("after method " + methodName + " returning " + result);
} catch (Throwable ex) {
// 异常通知
System.out.println("after method " + methodName + " occurs exception: " + ex.getMessage());
throw new Exception(ex);
}
// 后置通知
System.out.println("after method " + methodName);
return result;
}
public void afterThrowing(JoinPoint joinPoint) throws Throwable {
}
}
AOP应用一:
AOP它是可以通过预编译方式和运行期动态代理实现在不修改源代码的情况下给程序动态统一添加功能的一种技术。它是一种新的方法论,它是对传统OOP编程的一种补充。
AOP应用二:
埋点数据(相当于日志记录,),埋点主要是记录用户的操作,而用户的操作就是按钮的点击和列表的选择
服务端埋点。服务端埋点就是通过拦截用户的请求接口,对用户的一些行为信息进行采集
因为只有做了用户行为分析才能知道用户画像、才能知道用户在网站上的各种浏览、点击、购买背后的商业真相,从而给企业带来商业价值。
推荐系统。目前的推荐是基于用户的行为,然后运用不同的算法计算出用户应该展现的推荐数据。
先用到切面编程将访问_menu的方法后都执行切面(after的方法),即保存一条记录到user_behave_log
流量概况在一天内被点击了多少次的sql
SELECT
ubl.menuId,ubl.menuName,COUNT(1)
FROM
user_behave_log ubl
LEFT JOIN aspnet_users au ON ubl.userId = au.UserId
WHERE
au.UserName = 'huikemall'
AND ubl.date between '2017-09-01 00:00:00' AND '2017-09-01 23:59:59'
GROUP BY ubl.menuName;
@SuppressWarnings("unchecked")
@RequestMapping("/indexLeader")
public String indexLeader_menu(String belongString,String firstcom,ModelMap model,String siteId,HttpServletRequest request,HttpServletResponse response)throws Exception{
//将mapping和下面的方法不一致,加_menu即进行了拦截
//menuId和menuName从静态map获取,
}
@After("joinpoint2()")
public void record2(JoinPoint jp){
Signature signature = jp.getSignature();
signature.getName(); //one_menu
//从session获得userId, userName, source以及相应的转化处理逻辑
UserInfo userForMenu = (UserInfo) session.getAttribute("sessionUserInfo");
userForMenu.getUserId();
userForMenu.getSource() ;
userForMenu.getUserName();
System.out.println("访问埋点菜单后,往数据库插入一条记录");
//将信息组装发送到Kafka的Broker中,此时消息生产者
}
-
服务端埋点。服务端埋点就是通过拦截用户的请求接口,对用户的一些行为信息进行采集。
服务端日志采集
下面介绍下网易美学的服务端日志采集系统使用的技术,总体架构,部署图,运作流程以及配置相关信息。
1. 技术选型
服务端日志采集主要通过在Controller的接口中进行埋点,然后通过AOP技术、Kafka消息系统以及logback对用户行为进行采集。
之所以使用AOP技术是因为AOP的以下重要特定:
-
代码的侵入性小。对于业务代码的侵入性小,只需要在Controller的接口上添加注解,然后在其他模块对用户行为进行采集。
-
重用性。对于相同作用的代码可以进行重用。
-
扩展性。能够很好的对系统进行扩展。
由于使用异步方式对用户行为信息进行收集,因此需要使用消息中间件。目前消息中间件非常多,比较流行的有ActiveMQ、ZeroMQ、RabbitMQ、Kafka等。每个消息中间件都有各种的优势劣势,之所以使用Kafka消息中间件,是因为以下几点因素:
-
高性能。每秒钟可以处理数以千计生产者生成的消息。
-
高扩展性。可以通过简单的增加服务器横向扩展Kafka集群的容量。
-
分布式。消息来自数以千计的服务,使用分布式来解决单机处理海量数据的瓶颈。
-
持久性。Kafka中的消息可以持久化到硬盘上,这样可以防止数据的丢失。
因为用户的行为数据最终是以日志的形式持久化的,因此使用logback对日志持久化到日志服务器中。
2.总体架构
服务端日志采集系统主要由两个工程组成:beauty-bi-core和beauty-bi-service。由于网易美学的使用dubbo框架,因此有服务提供方和服务消费方。beauty-bi-core被web、wap和mainsite服务消费方依赖。此外,beauty-bi-service也依赖于beauty-bi-core,主要是依赖于其中的一些实体类及工具类。
beauty-bi-core工程为Kafka消息的生产者,主要封装实现切面的具体逻辑,其主要职责如下:
-
解析用户请求的Request信息:从Request中提取用户的基本信息,如设备型号、用户的供应商、ip、设备的分辨率、设备平台、设备的操作系统、设备id、app渠道等。
-
接口对应的参数:通过切面可以提取接口的参数值,从而知道用户的业务信息。
-
应用层返回的结果信息:因为切面使用AfterReturning方式,因此可以获取用层的返回结果,从返回结果中可以提取有用的信息。
-
用户的基本信息:用户的id信息。
-
信息格式化:将信息转化成JSON字符串。
-
发送消息:将最终需要发送的消息放入本地阻塞队列中,通过另一个线程异步从阻塞队列中获取消息并发送到Kafka Broker中。
beauty-bi-service工程为Kafka消息的消费者,其主要职责如下:
-
实时从Kafka中拉取最新的数据。
-
将JSON字符串转化成,方便进一步对用信息进行加工。
-
对用户的ip进行解析,获取ip对应的地区以及经纬度信息。
-
将加工好的最终信息持久化到log文件中。
3.部署图
上图为网易美学与日志系统系统相关的部署图,App、Wap和Mainsite服务器集群分别对应不同终端的应用。Kafka集群使用杭研的集群,目前有10个Broker。日志服务器有两台,通过Kafka的均衡策略对日志进行消费。
4.日志采集的流程
日志采集流程图如下所示:
上图为消息生产者和消息消费者共同组成的流程图。
消息生产者的具体步骤如下:
-
通过切面拦截用户的请求。
-
从切面中提取请求头的基本信息,如设备信息,cookie信息,ip信息等。
-
提取请求的接口参数信息。
-
从接口返回值中提取相关信息,如id,pvid等。
-
将提取的信息封装成JSON字符串,放到阻塞队列中,假如阻塞队列溢出会有三次重试机制。
-
异步线程从本地阻塞队列中获取数据,并将信息组装发送到Kafka的Broker中,此时消息生产者结束。
消息消费者的具体步骤如下:
-
实时从Kafka Broker中批量拉取消息。
-
将拉取的消息转化成对象。
-
解析ip对应的国家、省份、城市、经纬度信息。
-
对不同业务场景的信息进一步解析。
-
将日志信息转化成JSON字符串,持久化到log文件中。
5. 相关配置
-
application-XXX.properties:该配置放Kafka的相关属性,包括topic、groupId、server等信息。
-
beauty-log-msg.xml:该配置放在app-web,mainsite-web,wap-web的src/main/resources目录下,主要是初始化kafka生产者的信息。
-
beauty-bi-service.xml:该配置放在beauty-bi-service工程的src/main/resources目录下,主要用于加载kafka消费者的配置信息,并且启动kafka消费者服务。
-
logback.xml:该配置放在beauty-bi-service工程的src/main/resources目录下,主要用于声明日志文件存放的目录,需要持久化的日志的package路径,以及日志持久化的格式。
-
ip_conf.txt:该配置放在beauty-bi-service工程的src/main/resources目录下,用于解析ip对应的地域、经纬度等信息。
更多推荐
所有评论(0)