FlinkSQL读取Hbase数据
概述最近项目中用到flink进行实时计算,流程为从kafka读取数据,如果是维度数据,则插入到hbase中,如果是需要实时计算的数据,则进行实时计算,并将计算结果保存到MySQL中。在实时计算过程中,可能会用到hbase中的维度数据,为了开发的效率,使用flink-sql的方式实现。flink-sql是在flink流式计算的基础上进行了高度抽象,使开发过程更简单,更有效率,但要理解sql执行背后的
概述
最近项目中用到flink进行实时计算,流程为从kafka读取数据,如果是维度数据,则插入到hbase中,如果是需要实时计算的数据,
则进行实时计算,并将计算结果保存到MySQL中。在实时计算过程中,可能会用到hbase中的维度数据,为了开发的效率,使用flink-sql的方式实现。
flink-sql是在flink流式计算的基础上进行了高度抽象,使开发过程更简单,更有效率,但要理解sql执行背后的原理还是需要
仔细学习flink流式计算的相关内容。本文主要以flink-sql实现相关功能。
假设需求如下:
- 用户数据,需要从kafka中读取,并保存到hbase中,作为维度数据。
- 商品数据,需要从kafka中读取,并保存到hbase中,作为维度数据。
- 订单数据,需要从kafka中读取,并进行实时计算,最后将计算结果保存到MySQL中(表示支付成功后的订单数据)。
假设需要统计用户每天的订单数量
用户数据:userID, userName, sex, address (用户ID,用户名,性别,地址)
商品数据:productID, productName, productType (商品ID,商品名称,商品类型)
订单数据:orderID, userID, productID, productCount, money, buyDate (订单ID,用户ID,商品ID,商品数量,商品单价,购买日期)
最终计算结果:userName, productType, productName, buyDate, productCount, totalMoney (用户名,商品类型,商品名称,购买日期,商品数量,商品总价)
即:要求计算某人在某日购买了某种类别,某种商品名称的商品数量,花费了多少钱
maven依赖
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.10.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
<version>1.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client_2.11</artifactId>
<version>1.10.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hive</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.21</version>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib</artifactId>
<version>2.2</version>
</dependency>
</dependencies>
实体类
@Data
public class User implements Serializable {
private Integer userID;
private String userName;
private String sex;
private String address;
}
@Data
public class Product implements Serializable {
private Integer productID;
private String productName;
private String productType;
}
@Data
public class Order implements Serializable {
private Integer orderID;
private Integer userID;
private Integer productID;
private Integer productCount;
private Double money;
private String buyDate;
}
@Data
public class Result implements Serializable {
private String userName;
private String productType;
private String productName;
private String buyDate;
private Integer productCount;
private Double totalMoney;
}
创建执行环境
public class Main {
public static void main(String[] args)throws Exception{
//1. 相关参数设置
//创建流执行环境,并创建表执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//如果需要使用sql和table的话,这个必须设置
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
//非常关键,一定要设置启动检查点,设置最少一次处理语义和恰一次处理语义
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
//按照处理时间进行计算
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// checkpoint的清除策略
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setTolerableCheckpointFailureNumber(0);
//设置重启策略:3次尝试,每次尝试间隔60s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 60000));
Map<String, String> maps = new ImmutableMap.Builder<String, String>().
put("kafka.url", args[0]).
put("hbase.zookeeper.quorum", args[1]).
put("dbUrl", args[2]).
put("dbName", args[3]).
put("user", args[4]).
put("psw", args[5]).build();
//相关数据设置成全局配置,以便再后面的function中获取
env.getConfig().setGlobalJobParameters(ParameterTool.fromMap(maps));
//table中时间转换函数
tEnv.registerFunction("time_format", new DateFormatFunction());
//2. 创建hbase环境
//从全局配置中获取hbase的zookeeper地址
String zkUrl = env.getConfig().getGlobalJobParameters().toMap().getOrDefault("hbase.zookeeper.quorum", "");
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zkUrl);
//2.1 创建hbase表
TableDefine.defineUserHbaseTable(tEnv, conf);//创建用户表
TableDefine.defineProductHbaseTable(tEnv, conf);//创建商品表
//3. 创建kafka环境
//3.1 从全局配置中获取kafka相关配置
String kafkaUrl = env.getConfig().getGlobalJobParameters().toMap().getOrDefault("kafka.url", "");
Properties props = new Properties();
props.setProperty("bootstrap.servers",kafkaUrl);
props.setProperty("group.id", "配置的groupID");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//3.2 直接入库Hbase库的维度数据和需要进行实时计算的数据这里分别写了一个,因为还是有些不同的
//3.2.1 维度数据的处理
JobDefine.userJob(env, props);
JobDefine.productJob(env, props);
//实时计算的处理
JobDefine.orderJob(env,tEnv,props);
//执行任务
env.execute("orderJob");
}
}
将kafka读取的json串转换成具体是Java实体类
public class MyKafkaRichFlatMapFunction<OUT> extends RichFlatMapFunction<String, OUT> {
private static Logger logger = LogManager.getLogger(MyKafkaRichFlatMapFunction.class);
@Override
public void flatMap(String value, Collector<OUT> collector) {
try {
if(value != null && !"".equals(value)){
Class<OUT> tClass = (Class<OUT>)((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
OUT out = JSONObject.parseObject(value, tClass);
collector.collect(out);
}
} catch (Exception e) {
logger.error("AbstractKafkaRichFlatMapFunction 发生异常:" + value, e);
}
}
}
sql中用到的时间转换函数
public class DateFormatFunction extends ScalarFunction {
public String eval(Timestamp time, String format) {
return new SimpleDateFormat(format).format(new Date(time.getTime()));
}
}
hbase 表定义
public class TableDefine {
public static void defineUserHbaseTable(StreamTableEnvironment tEnv,Configuration conf){
HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_user");
//设置hbase表的rowKey及其类型
hBaseTableSource.setRowKey("rowKey", Integer.class);
//设置hbase表的字段及其类型 第一个参数为列簇,第二个参数为字段名(最后简写以减少存储空间和执行效率),第三个参数为类型
hBaseTableSource.addColumn("f", "uid", Integer.class);
hBaseTableSource.addColumn("f", "uname", String.class);
hBaseTableSource.addColumn("f", "sex", String.class);
hBaseTableSource.addColumn("f", "address", String.class);
//向flinktable注册处理函数
// 第一个参数为注册函数名字,即flink-sql中的表名,而new HBaseTableSource(conf, "t_user")中的t_user为hbase的表名
// 第二个参数是一个TableFunction,返回结果为要查询出的数据列,即hbase表addColumn的哪些列,参数为rowkey,表示根据rowkey即userID进行查询
tEnv.registerFunction("t_user", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
}
public static void defineProductHbaseTable(StreamTableEnvironment tEnv,Configuration conf){
HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_product");
//设置hbase表的rowKey及其类型
hBaseTableSource.setRowKey("rowKey", Integer.class);
//设置hbase表的字段及其类型 第一个参数为列簇,第二个参数为字段名(最后简写以减少存储空间和执行效率),第三个参数为类型
hBaseTableSource.addColumn("f", "pid", Integer.class);
hBaseTableSource.addColumn("f", "pname", String.class);
hBaseTableSource.addColumn("f", "pt", String.class);
//向flinktable注册处理函数
// 第一个参数为注册函数名字,即flink-sql中的表名,而new HBaseTableSource(conf, "t_product")中的t_product为hbase的表名
// 第二个参数是一个TableFunction,返回结果为要查询出的数据列,即hbase表addColumn的哪些列,参数为rowkey,表示根据rowkey即userID进行查询
tEnv.registerFunction("t_product", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
}
}
hbase sink 定义
//抽象类
public abstract class AbstractHbaseSinkFunction<OUT> extends RichSinkFunction<OUT> {
protected static String cf_String = "f";
protected static byte[] cf = Bytes.toBytes(cf_String);
protected String tableName = null;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig()
.getGlobalJobParameters();
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", globalParams.toMap().get("hbase.zookeeper.quorum"));
if (null == connection) {
connection = ConnectionFactory.createConnection(conf);
}
}
@Override
public void invoke(OUT value, Context context) throws Exception {
HTable table = null;
try {
table = (HTable) connection.getTable(TableName.valueOf(tableName));
handle(value, context, table);
} finally {
table.close();
}
}
protected abstract void handle(OUT value, Context context, HTable table) throws Exception;
@Override
public void close() throws Exception {
connection.close();
}
protected byte[] getByteValue(Object value){
if(value==null){
return Bytes.toBytes("");
}else{
return Bytes.toBytes(value.toString());
}
}
}
//用户sink
public class UserHbaseSinkFunction extends AbstractHbaseSinkFunction<User> {
public UserHbaseSinkFunction(){
tableName = "t_user";
}
@Override
protected void handle(User value, Context context, HTable table) throws Exception {
Integer rowkey1 = value.getUserID();
Put put1 = new Put(Bytes.toBytes(rowkey1));
//这里的字段必须和定义表时的字段保存一致
put1.addColumn(cf, Bytes.toBytes("uid"), getByteValue(value.getUserID()));
put1.addColumn(cf, Bytes.toBytes("uname"), getByteValue(value.getUserName()));
put1.addColumn(cf, Bytes.toBytes("sex"), getByteValue(value.getSex()));
put1.addColumn(cf, Bytes.toBytes("addr"), getByteValue(value.getAddress()));
table.put(put1);
}
}
//商品sink
public class ProductHbaseSinkFunction extends AbstractHbaseSinkFunction<Product> {
public ProductHbaseSinkFunction(){
tableName = "t_product";
}
@Override
protected void handle(Product value, Context context, HTable table) throws Exception {
//这里的字段必须和定义表时的字段保存一致
Integer rowkey1 = value.getProductID();
Put put1 = new Put(Bytes.toBytes(rowkey1));
put1.addColumn(cf, Bytes.toBytes("pid"), getByteValue(value.getProductID()));
put1.addColumn(cf, Bytes.toBytes("pname"), getByteValue(value.getProductName()));
put1.addColumn(cf, Bytes.toBytes("pt"), getByteValue(value.getProductType()));
table.put(put1);
}
}
MySQL sink 定义
public class OrderMysqlSinkFunction extends RichSinkFunction<List<Result>> {
public static Logger logger = LogManager.getLogger(OrderMysqlSinkFunction.class);
private DataSource dataSource = null;
@Override
public void open(Configuration parameters) throws Exception {
logger.info("MysqlSinkFunction open");
super.open(parameters);
Map<String, String> globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
String dbUrl = globalParams.get("dbUrl");
String dbName = globalParams.get("dbName");
String user = globalParams.get("user");
String psw = globalParams.get("psw");
try {
dataSource = DBUtil.getDataSource(dbUrl, dbName, user, psw);
} catch (Exception e) {
logger.error("" + e);
}
}
@Override
public void invoke(List<Result> results, Context context) throws Exception {
Connection connection = dataSource.getConnection();
PreparedStatement ps = null;
try {
//构建sql userName, productType, productName, buyDate, productCount, totalMoney
String sql = "INSERT INTO t_result (userName, productType, productName, buyDate, productCount, totalMoney) " +
" values (?,?,?,?,?,?) on duplicate key " +
" update productCount = values(productCount),totalMoney=values(totalMoney) ";
ps = connection.prepareStatement(sql);
for (Result record : results) {
ps.setObject(1, record.getUserName());
ps.setObject(2, record.getProductType());
ps.setObject(3, record.getProductName());
ps.setObject(4, record.getBuyDate());
ps.setObject(5, record.getProductCount());
ps.setObject(6, record.getTotalMoney());
ps.addBatch();
}
ps.executeBatch();
}catch (Exception e){
logger.error("数据入库异常:"+e);
throw new RuntimeException("gcoll_vmotask_sum保存数据库失败:"+e);
}finally {
DBUtil.close(ps);
}
}
@Override
public void close() {
logger.info("MysqlSinkFunction close");
DBUtil.closeDataSource();
}
}
计算任务定义
public class JobDefine {
//用户数据直接保存hbase
public static void userJob(StreamExecutionEnvironment env,Properties props){
//获取kafka对应的source流
SingleOutputStreamOperator<String> userSource = env.addSource(new FlinkKafkaConsumer010<>("user表对应的topic", new SimpleStringSchema(), props));
//将kafka读取的数据(为json字符串SimpleStringSchema)转换成Java对象 自定义类,这里必须调用.returns(User.class),不然启动会报错
SingleOutputStreamOperator<User> userMessageStream=userSource.flatMap(new MyKafkaRichFlatMapFunction<User>()).returns(User.class);
//将Java对象保存到hbase中
userMessageStream.addSink(new UserHbaseSinkFunction()).name("UserHbaseSinkFunction");
}
//商品数据直接保存hbase
public static void productJob(StreamExecutionEnvironment env,Properties props){
//获取kafka对应的source流
SingleOutputStreamOperator<String> productSource = env.addSource(new FlinkKafkaConsumer010<>("product表对应的topic", new SimpleStringSchema(), props));
//将kafka读取的数据(为json字符串SimpleStringSchema)转换成Java对象 自定义类,这里必须调用.returns(Product.class),不然启动会报错
SingleOutputStreamOperator<Product> productStream=productSource.flatMap(new MyKafkaRichFlatMapFunction<Product>()).returns(Product.class);
//将Java对象保存到hbase中
productStream.addSink(new ProductHbaseSinkFunction()).name("ProductHbaseSinkFunction");
}
//订单数据需要进行计算统计
public static void orderJob(StreamExecutionEnvironment env, StreamTableEnvironment tEnv,Properties props){
SingleOutputStreamOperator orderSource = env.addSource(new FlinkKafkaConsumer010<>("order表对应的topic", new SimpleStringSchema(), props));
//将kafka读取的数据(为json字符串SimpleStringSchema)转换成Java对象 自定义类,这里必须调用.returns(Order.class),不然启动会报错
SingleOutputStreamOperator<Order> orderMessageStream = orderSource.flatMap(new MyKafkaRichFlatMapFunction<Order>()).returns(Order.class);
//将当前订单流注册到flinktable 中,第一个参数就是表名,第二个参数就是数据流
tEnv.createTemporaryView("t_order", orderMessageStream);
//将order数据进行实时计算并将结果保存到MySQL中
new OrderFunction().handle(orderMessageStream, env,tEnv);
}
}
实时计算
public class OrderFunction {
public void handle(SingleOutputStreamOperator streamOperator,StreamExecutionEnvironment env,
StreamTableEnvironment tEnv){
/**
* 这里比较坑,需要说明下:
* 1. 字段顺序的问题,select中的字段顺序和类型必须和Result类中一致,不然会报异常 Field types of query result and registered TableSink do not match
* flink框架会根据返回结果Result类中的属性的字典升序进行排序,并将排序完后的字段及其字段类型和select 中的字段进行类型匹配,只要类型匹配成功则成功
* 如下面的返回类型Result类,字段按照字典升序排列后为:[buyDate: STRING, productCount: INT, productName: STRING, productType: STRING, totalMoney: DOUBLE, userName: STRING]
* 那么我们的select字段的返回类型就必须为:[STRING, INT, STRING, STRING,DOUBLE,STRING]
* 假设我们的sql为(sum(o.productCount) ,o.buyDate交换了位置):select sum(o.productCount) ,o.buyDate,p.pname,p.pt,sum(o.money*o.productCount) ,u.uname 则会报错Field types of query result and registered TableSink do not match
* 假设我们的sql为(pt和pname交换了位置):select o.buyDate,sum(o.productCount),p.pt,p.pname,sum(o.money*o.productCount) ,u.uname,那么Result类
* 的productName的值将会为pt的值,productType的值将会为pname的值
* 2. 对于hbase维度数据,维度数据的字段为简写,如username被写成的uname,那么这里select 中的字段必须和hbase表定义的字段一致,如p.pname,u.uname
* 3. 如果select中的字段类型和result类中字段类型不一样时,需要在sql中使用相关函数进行转换
* 4. 在这里的t_order必须和JobDefine中注册到table环境中的【视图名字】一致,如tEnv.createTemporaryView("t_order", orderMessageStream);
* 5. 这里的t_user必须和TableDefine中注册到table环境中的【注册函数名】 一致,如:tEnv.registerFunction("t_user", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
* 6.这里的t_product必须和TableDefine中注册到table环境中的【注册函数名】 一致,如:tEnv.registerFunction("t_product", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
*
*/
Table collectionTaskResult = tEnv.sqlQuery("select o.buyDate,sum(o.productCount) ,p.pname,p.pt,sum(o.money*o.productCount) ,u.uname" +
" from t_order o,lateral table(t_user(o.userID)) u,lateral table(t_product(o.productID)) p " +
" where o.userID=u.uid and o.productID=p.pid group by u.uname,p.pt,p.pname,o.buyDate");
DataStream<Tuple2<Boolean, Result>> stream = tEnv.toRetractStream(collectionTaskResult, Result.class);
//windAll 这里是为了减轻数据库的压力,每10秒保存一次库,在存库前将数据进行处理,只保存唯一键相同的一条数据
stream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new ProcessAllWindowFunction<Tuple2<Boolean, Result>, List<Result>, TimeWindow>() {
//values的数量等于这个时间窗口范围内的所有数据量
public void process(Context context, Iterable<Tuple2<Boolean, Result>> values, Collector<List<Result>> out) throws Exception {
Iterator<Tuple2<Boolean, Result>> iterator = values.iterator();
Map<String, Result> result = Maps.newHashMap();
while (iterator.hasNext()) {
Tuple2<Boolean, Result> tuple = iterator.next();
//输入流进行收集
if (tuple.getField(0)) {
Result item = tuple.getField(1);
//这样map的结果表示为唯一键相同时的最新数据
result.put(item.getUserName()+":"+item.getProductType()+":"+item.getProductName()+":"+item.getBuyDate(), item);
}
}
out.collect(Lists.newArrayList(result.values()));
}
}).setParallelism(1)//之所以要将并行度设置为1,是为了防止并发写入数据库导致数据错误
//将Java对象保存到MySQL中
.addSink(new OrderMysqlSinkFunction()).name("OrderMysqlSinkFunction");
}
}
- 感觉flink-sql很牛逼呢,但就像文章最开始所述的flink-sql是在flink流式计算的基础上进行了高度抽象,它不能实现所有功能。
订单支付成功后,可以退款,退款完成后订单状态会变成失效,那么统计结果中不应该包含退款成功后相关数据,
那么如何使用sql进行计算呢?我也不知道,但可以使用流式计算进行操作,下次在总结。
MySQL数据库连接工具类
public class DBUtil {
private static Logger logger = LogManager.getLogger(DBUtil.class);
private static DruidDataSource dataSource;
public static DruidDataSource getDataSource(String dbUrl, String dbName, String username, String password) {
try {
if (dataSource == null) {
synchronized (DBUtil.class) {
if (dataSource == null) {
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://#/%?autoReconnect=true&useUnicode=true&useAffectedRows=true&characterEncoding=utf8".replace("#", dbUrl).replace("%", dbName));
dataSource.setUsername(username);
dataSource.setPassword(password);
//configuration
dataSource.setInitialSize(20);
dataSource.setMinIdle(20);
dataSource.setMaxActive(100);
dataSource.setMaxWait(60000);
dataSource.setTimeBetweenEvictionRunsMillis(60000);
dataSource.setMinEvictableIdleTimeMillis(300000);
dataSource.setValidationQuery("SELECT 1 FROM DUAL");
dataSource.setTestWhileIdle(true);
dataSource.setTestOnBorrow(false);
dataSource.setTestOnReturn(false);
dataSource.setPoolPreparedStatements(true);
dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
dataSource.setFilters("stat,wall,log4j");
dataSource.setConnectionProperties("druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000");
}
}
}
} catch (Exception e) {
logger.error("初始化数据源信息异常....", e);
}
return dataSource;
}
/**
* 获取数据库连接池
*/
public static DataSource getDataSource() {
return dataSource;
}
/**
* 关闭数据库连接池
*/
public static void closeDataSource() {
logger.info("method datasource close !");
if (dataSource != null) {
dataSource.close();
}
}
/**
* 获取数据库连接
*/
public static Connection getConnection() {
Connection conn = null;
try {
conn = dataSource.getConnection();
} catch (SQLException e) {
logger.error("获取数据库连接异常", e);
}
return conn;
}
/**
* 释放数据库连接
*/
public static void close(Connection conn,Statement st,ResultSet rs) {
if(rs!=null) {
try {
rs.close();
} catch (SQLException e) {
logger.error("释放数据库连接异常", e);
}
}
close(conn,st);
}
public static void close(Connection conn,Statement st) {
if(st!=null) {
try {
st.close();
} catch (SQLException e) {
logger.error("关闭Statement异常", e);
}
}
close(conn);
}
public static void close(Statement st) {
if(st!=null) {
try {
st.close();
} catch (SQLException e) {
logger.error("关闭Statement异常", e);
}
}
}
public static void close(Connection conn) {
if(conn!=null) {
try {
conn.close();
} catch (SQLException e) {
logger.error("关闭SConnection异常", e);
}
}
}
public static void commit(Connection conn){
if(conn!=null) {
try {
conn.commit();
} catch (SQLException e) {
logger.error("提交事务异常", e);
}
}
}
public static void rollback(Connection conn){
if(conn!=null) {
try {
conn.rollback();
} catch (SQLException e) {
logger.error("回滚异常", e);
}
}
}
}
更多推荐
所有评论(0)