上一篇文章主要介绍了项目的整体结构,这篇文章展示具体结构的实现

一、项目版本

SpringBoot  2.1.23    ES:6.7

引入jar

<dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>6.7.0</version>
        </dependency>

        <!--后续引入-->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.7.0</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.dataformat</groupId>
                    <artifactId>jackson-dataformat-smile</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.dataformat</groupId>
                    <artifactId>jackson-dataformat-cbor</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-elasticsearch</artifactId>
            <!-- <version>3.2.0.RELEASE</version>-->
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-codec</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <!--引入kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

二、配置kafka

kafka配置yml文件

spring:
# kafka配置
    kafka:
        bootstrap-servers: kafka集群地址+端口

        producer:
            batch-size: 16384
            buffer-memory: 33554432
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            retries: 0
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
            # 指定默认消费者group id
            group-id: consumer-group
            auto-offset-reset: earliest
            enable-auto-commit: true
            auto-commit-interval: 5000
            # 指定消息key和消息体的编解码方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        #开发环境topic
        template:
            default-topic: topic_sys_dev

编写kafka发送消息接口:

public interface ISysSendToKafkaService
{
    void sendMsgToKafka(Sys sys,String type);

}

kafka消息实现类

@Service
public class SysSendToKafkaServiceImpl implements ISysSendToKafkaService {
    private static final Logger log = LoggerFactory.getLogger(SysSendToKafkaServiceImpl.class);

    @Value("${spring.kafka.template.default-topic}")
    private String kafkaTopic ;
    @Autowired
    KafkaTemplate kafkaTemplate;



    @Override
    public void sendMsgToKafka(Sys sys,String type) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("type",type);
        jsonObject.put("sys",sys);
        kafkaTemplate.send(kafkaTopic,jsonObject.toJSONString());
    }
}

以上配置类之后,在SpringBoot的中,对TIDB数据处理完成之后,再调用该方法,将数据发送到kafka中去。

//调用存放到kafka
 sendToKafkaService.sendMsgToKafka(sys,"create");

此处,我对前端处理的消息做了标记,页面新增的数据用type=create,修改的数据type=update,删除的数据用type=delete,这样在消费kafka消息的时候明确对ES的数据做什么样的处理

三、kafka队列消息的消费处理

1、在yml中配置ES信息,ES配置的是集群地址

#开发环境es配置地址
qymp:
    elasticsearch:
        hostlist: 192.168.110.110:9200,192.168.110.111:9200,192.168.110.112:9200

2、配置ESconfig

@Configuration
public class ESConfig {
    @Value("${qymp.elasticsearch.hostlist}")
    private String hostlist;

    @Bean // 高版本客户端
    public RestHighLevelClient restHighLevelClient() {
        // 解析 hostlist 配置信息。假如以后有多个,则需要用 , 分开
        String[] split = hostlist.split(",");
        // 创建 HttpHost 数组,其中存放es主机和端口的配置信息
        HttpHost[] httpHostArray = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String item = split[i];
            httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
        }
        // 创建RestHighLevelClient客户端
        return new RestHighLevelClient(RestClient.builder(httpHostArray));
    }

    // 项目主要使用 RestHighLevelClient,对于低级的客户端暂时不用
    @Bean
    public RestClient restClient() {
        // 解析hostlist配置信息
        String[] split = hostlist.split(",");
        // 创建HttpHost数组,其中存放es主机和端口的配置信息
        HttpHost[] httpHostArray = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String item = split[i];
            httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
        }
        return RestClient.builder(httpHostArray).build();
    }
}

3、新建工具类KafkaConsumer,用于监听kafka队列消息,并消费,对ES数据进行同步处理,

@Slf4j
@Component
public class KafkaConsumer {

    @Autowired
    @Qualifier("restHighLevelClient")
    private RestHighLevelClient client;


    @KafkaListener(topics = {"${spring.kafka.template.default-topic}"})
    public void userConsumer(String message) throws IOException {
        JSONObject jsonObject = JSONObject.parseObject(message);
        String jsondata = (jsonObject.get("sys")).toString();
        Sys sys= FastJsonUtils.getJsonToBean(jsondata,Sys.class);
        String type = (jsonObject.get("type")).toString();

        if ("delete".equals(type)){
            //
            deleteData(sys);
        }else {
            delData(sys);
        }
    }

    //处理数据
    public void delData(Sys sysXx) throws IOException {
        //判断当前数据是否存在于ES数据库
        //1. 创建检索请求
        SearchRequest searchRequest0 = new SearchRequest();
        //1.1)指定索引
        searchRequest0.indices("es_sys");
        searchRequest0.types("base_sysxx");
        //1.2)构造检索条件
        SearchSourceBuilder sourceBuilder0 = new SearchSourceBuilder();

        QueryBuilder queryBuilder = QueryBuilders.boolQuery()
                .must(QueryBuilders.termQuery("uuid", sysXx.getQyuuid()))
                .mustNot(QueryBuilders.termQuery("yxbz","1"));

        sourceBuilder0.query(queryBuilder);//精确查询查询
        searchRequest0.source(sourceBuilder0);
        //2. 执行检索
        SearchResponse searchResponse0 = client.search(searchRequest0, RequestOptions.DEFAULT);
        JSONObject jsonObject = FastJsonUtils.toJsonObject(searchResponse0) ;
        JSONObject jsonhit = jsonObject.getJSONObject("hits");
        JSONArray hits= JSON.parseArray(jsonhit.getString("hits").toString());
        String id = null;

        for(int i=0;i<hits.size();i++) {
            JSONObject json_data = hits.getJSONObject(i);
            id = String.valueOf(json_data.get("id")) ;
        }

        Integer totalHitsNum = Integer.valueOf(jsonhit.get("totalHits").toString());

        //进行实体封装
       map.put("zt",sysXx.getZt());
        map.put("yxj",syXx.getYxj());
        map.put("whbz",sysXx.getWhbz());
        map.put("xxwzd",sysXx.getXxwzd());
        map.put("createBy",sysXx.getCreateBy());
        map.put("createTime",sysXx.getCreateTime());
        map.put("updateBy",sysXx.getUpdateBy());
        map.put("updateTime",sysXx.getUpdateTime());
        map.put("uuid",sysXx.getuuid());

        map.put("yxbz","0");

        if (totalHitsNum>0){
            //进行更新操作
            IndexRequest indexRequest = new IndexRequest("es_sys","base_sysxx")
                    .id(id)
                    .source(map);
            indexRequest.opType("index");//可选create或index
            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            log.info("----更新ES同步数据----------uuid:"+id);
        }else {
            IndexRequest indexRequest = new IndexRequest("es_sys","base_sysxx")
                    .id(sysQympXx.getQyuuid().toString())
                    .source(map);
            indexRequest.opType("create");//可选create或index
            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            log.info("----新增ES同步数据----------uuid:"+sysXx.getQyuuid());
        }
    }


    /**
     * 删除操作
     * @param sysXx
     * @throws IOException
     */
    public void deleteData(Sys sysXx) throws IOException {

        //进行实体封装
        Map map = new HashMap();
        map.put("zt",sysXx.getZt());
        map.put("yxj",syXx.getYxj());
        map.put("whbz",sysXx.getWhbz());
        map.put("xxwzd",sysXx.getXxwzd());
        map.put("createBy",sysXx.getCreateBy());
        map.put("createTime",sysXx.getCreateTime());
        map.put("updateBy",sysXx.getUpdateBy());
        map.put("updateTime",sysXx.getUpdateTime());
        map.put("uuid",sysXx.getuuid());

        map.put("yxbz","1");//删除的标志是1

        //进行更新操作
        IndexRequest indexRequest = new IndexRequest("es_sys","base_sysxx")
                .id(sysXx.getQyuuid().toString())
                .source(map);
        indexRequest.opType("index");//可选create或index
        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
        log.info("----删除ES同步数据---(逻辑删除)-------IndexResponse:"+indexResponse);

    }



}

四、SpringBoot控制层调用ES查询方法举例

@GetMapping("/getXl")
    public Object getXl(String mc) throws IOException {
        List<QympNsrmcIdVo> resultlist = new ArrayList<>();
        //1. 创建检索请求
        SearchRequest searchRequest = new SearchRequest();
        //1.1)指定索引
        searchRequest.indices("es_sys");
        searchRequest.types("base_sysxx");
        //1.2)构造检索条件
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        QueryBuilder queryBuilder = QueryBuilders.boolQuery()
                //.must(QueryBuilders.queryStringQuery(mc))
                .must(QueryBuilders.matchPhraseQuery("mc",mc))
                .mustNot(QueryBuilders.termQuery("yxbz", "1"));//模糊查询
        // sourceBuilder.query(queryBuilder);//左右模糊
        sourceBuilder.query(queryBuilder);

        // 设置源字段过虑,第一个参数结果集包括哪些字段,第二个参数表示结果集不包括哪些字段
        sourceBuilder.fetchSource(new String[]{"mc", "uuid", "yxj"}, new String[]{});
        sourceBuilder.size(10);
        searchRequest.source(sourceBuilder);
        //2. 执行检索
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        JSONObject jsonObject = FastJsonUtils.toJsonObject(searchResponse);
        JSONObject jsonhit = jsonObject.getJSONObject("hits");
        JSONArray hitsArray = JSON.parseArray(jsonhit.getString("hits").toString());
        for (int i = 0; i < hitsArray.size(); i++) {
            McIdVo vo = new McIdVo();
            JSONObject hi = (JSONObject) hitsArray.get(i);
            String key = (hi.get("id")).toString();
            vo.setMcId(key);
            Map map = (Map) hi.get("sourceAsMap");

            if (map.get("uuid")!=null){
                //获取UUId
                String uuidStr = (map.get("uuid")).toString();
                vo.setuuid(uuidStr);
            }
            resultlist.add(vo);
        }
        return AjaxResult.success(resultlist);
    }

最后问题就到这:

总结一下:在此次项目中主要问题点在于SpringBoot与ES版本的配置

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐