SpringBoot+kafka+ES实现信息数据同步管理(下)
上一篇文章主要介绍了项目的整体结构,这篇文章展示具体结构的实现一、项目版本SpringBoot 2.1.23ES:6.7引入jar<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId>
·
上一篇文章主要介绍了项目的整体结构,这篇文章展示具体结构的实现
一、项目版本
SpringBoot 2.1.23 ES:6.7
引入jar
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.7.0</version>
</dependency>
<!--后续引入-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.7.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.7.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
<!-- <version>3.2.0.RELEASE</version>-->
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--引入kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
二、配置kafka
kafka配置yml文件
spring:
# kafka配置
kafka:
bootstrap-servers: kafka集群地址+端口
producer:
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 0
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默认消费者group id
group-id: consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 5000
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#开发环境topic
template:
default-topic: topic_sys_dev
编写kafka发送消息接口:
public interface ISysSendToKafkaService
{
void sendMsgToKafka(Sys sys,String type);
}
kafka消息实现类
@Service
public class SysSendToKafkaServiceImpl implements ISysSendToKafkaService {
private static final Logger log = LoggerFactory.getLogger(SysSendToKafkaServiceImpl.class);
@Value("${spring.kafka.template.default-topic}")
private String kafkaTopic ;
@Autowired
KafkaTemplate kafkaTemplate;
@Override
public void sendMsgToKafka(Sys sys,String type) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("type",type);
jsonObject.put("sys",sys);
kafkaTemplate.send(kafkaTopic,jsonObject.toJSONString());
}
}
以上配置类之后,在SpringBoot的中,对TIDB数据处理完成之后,再调用该方法,将数据发送到kafka中去。
//调用存放到kafka
sendToKafkaService.sendMsgToKafka(sys,"create");
此处,我对前端处理的消息做了标记,页面新增的数据用type=create,修改的数据type=update,删除的数据用type=delete,这样在消费kafka消息的时候明确对ES的数据做什么样的处理
三、kafka队列消息的消费处理
1、在yml中配置ES信息,ES配置的是集群地址
#开发环境es配置地址
qymp:
elasticsearch:
hostlist: 192.168.110.110:9200,192.168.110.111:9200,192.168.110.112:9200
2、配置ESconfig
@Configuration
public class ESConfig {
@Value("${qymp.elasticsearch.hostlist}")
private String hostlist;
@Bean // 高版本客户端
public RestHighLevelClient restHighLevelClient() {
// 解析 hostlist 配置信息。假如以后有多个,则需要用 , 分开
String[] split = hostlist.split(",");
// 创建 HttpHost 数组,其中存放es主机和端口的配置信息
HttpHost[] httpHostArray = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String item = split[i];
httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
}
// 创建RestHighLevelClient客户端
return new RestHighLevelClient(RestClient.builder(httpHostArray));
}
// 项目主要使用 RestHighLevelClient,对于低级的客户端暂时不用
@Bean
public RestClient restClient() {
// 解析hostlist配置信息
String[] split = hostlist.split(",");
// 创建HttpHost数组,其中存放es主机和端口的配置信息
HttpHost[] httpHostArray = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String item = split[i];
httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
}
return RestClient.builder(httpHostArray).build();
}
}
3、新建工具类KafkaConsumer,用于监听kafka队列消息,并消费,对ES数据进行同步处理,
@Slf4j
@Component
public class KafkaConsumer {
@Autowired
@Qualifier("restHighLevelClient")
private RestHighLevelClient client;
@KafkaListener(topics = {"${spring.kafka.template.default-topic}"})
public void userConsumer(String message) throws IOException {
JSONObject jsonObject = JSONObject.parseObject(message);
String jsondata = (jsonObject.get("sys")).toString();
Sys sys= FastJsonUtils.getJsonToBean(jsondata,Sys.class);
String type = (jsonObject.get("type")).toString();
if ("delete".equals(type)){
//
deleteData(sys);
}else {
delData(sys);
}
}
//处理数据
public void delData(Sys sysXx) throws IOException {
//判断当前数据是否存在于ES数据库
//1. 创建检索请求
SearchRequest searchRequest0 = new SearchRequest();
//1.1)指定索引
searchRequest0.indices("es_sys");
searchRequest0.types("base_sysxx");
//1.2)构造检索条件
SearchSourceBuilder sourceBuilder0 = new SearchSourceBuilder();
QueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("uuid", sysXx.getQyuuid()))
.mustNot(QueryBuilders.termQuery("yxbz","1"));
sourceBuilder0.query(queryBuilder);//精确查询查询
searchRequest0.source(sourceBuilder0);
//2. 执行检索
SearchResponse searchResponse0 = client.search(searchRequest0, RequestOptions.DEFAULT);
JSONObject jsonObject = FastJsonUtils.toJsonObject(searchResponse0) ;
JSONObject jsonhit = jsonObject.getJSONObject("hits");
JSONArray hits= JSON.parseArray(jsonhit.getString("hits").toString());
String id = null;
for(int i=0;i<hits.size();i++) {
JSONObject json_data = hits.getJSONObject(i);
id = String.valueOf(json_data.get("id")) ;
}
Integer totalHitsNum = Integer.valueOf(jsonhit.get("totalHits").toString());
//进行实体封装
map.put("zt",sysXx.getZt());
map.put("yxj",syXx.getYxj());
map.put("whbz",sysXx.getWhbz());
map.put("xxwzd",sysXx.getXxwzd());
map.put("createBy",sysXx.getCreateBy());
map.put("createTime",sysXx.getCreateTime());
map.put("updateBy",sysXx.getUpdateBy());
map.put("updateTime",sysXx.getUpdateTime());
map.put("uuid",sysXx.getuuid());
map.put("yxbz","0");
if (totalHitsNum>0){
//进行更新操作
IndexRequest indexRequest = new IndexRequest("es_sys","base_sysxx")
.id(id)
.source(map);
indexRequest.opType("index");//可选create或index
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
log.info("----更新ES同步数据----------uuid:"+id);
}else {
IndexRequest indexRequest = new IndexRequest("es_sys","base_sysxx")
.id(sysQympXx.getQyuuid().toString())
.source(map);
indexRequest.opType("create");//可选create或index
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
log.info("----新增ES同步数据----------uuid:"+sysXx.getQyuuid());
}
}
/**
* 删除操作
* @param sysXx
* @throws IOException
*/
public void deleteData(Sys sysXx) throws IOException {
//进行实体封装
Map map = new HashMap();
map.put("zt",sysXx.getZt());
map.put("yxj",syXx.getYxj());
map.put("whbz",sysXx.getWhbz());
map.put("xxwzd",sysXx.getXxwzd());
map.put("createBy",sysXx.getCreateBy());
map.put("createTime",sysXx.getCreateTime());
map.put("updateBy",sysXx.getUpdateBy());
map.put("updateTime",sysXx.getUpdateTime());
map.put("uuid",sysXx.getuuid());
map.put("yxbz","1");//删除的标志是1
//进行更新操作
IndexRequest indexRequest = new IndexRequest("es_sys","base_sysxx")
.id(sysXx.getQyuuid().toString())
.source(map);
indexRequest.opType("index");//可选create或index
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
log.info("----删除ES同步数据---(逻辑删除)-------IndexResponse:"+indexResponse);
}
}
四、SpringBoot控制层调用ES查询方法举例
@GetMapping("/getXl")
public Object getXl(String mc) throws IOException {
List<QympNsrmcIdVo> resultlist = new ArrayList<>();
//1. 创建检索请求
SearchRequest searchRequest = new SearchRequest();
//1.1)指定索引
searchRequest.indices("es_sys");
searchRequest.types("base_sysxx");
//1.2)构造检索条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
QueryBuilder queryBuilder = QueryBuilders.boolQuery()
//.must(QueryBuilders.queryStringQuery(mc))
.must(QueryBuilders.matchPhraseQuery("mc",mc))
.mustNot(QueryBuilders.termQuery("yxbz", "1"));//模糊查询
// sourceBuilder.query(queryBuilder);//左右模糊
sourceBuilder.query(queryBuilder);
// 设置源字段过虑,第一个参数结果集包括哪些字段,第二个参数表示结果集不包括哪些字段
sourceBuilder.fetchSource(new String[]{"mc", "uuid", "yxj"}, new String[]{});
sourceBuilder.size(10);
searchRequest.source(sourceBuilder);
//2. 执行检索
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
JSONObject jsonObject = FastJsonUtils.toJsonObject(searchResponse);
JSONObject jsonhit = jsonObject.getJSONObject("hits");
JSONArray hitsArray = JSON.parseArray(jsonhit.getString("hits").toString());
for (int i = 0; i < hitsArray.size(); i++) {
McIdVo vo = new McIdVo();
JSONObject hi = (JSONObject) hitsArray.get(i);
String key = (hi.get("id")).toString();
vo.setMcId(key);
Map map = (Map) hi.get("sourceAsMap");
if (map.get("uuid")!=null){
//获取UUId
String uuidStr = (map.get("uuid")).toString();
vo.setuuid(uuidStr);
}
resultlist.add(vo);
}
return AjaxResult.success(resultlist);
}
最后问题就到这:
总结一下:在此次项目中主要问题点在于SpringBoot与ES版本的配置
更多推荐
已为社区贡献2条内容
所有评论(0)