系列目录

使用Kafka、Elasticsearch、Grafana搭建业务监控系统(一)技术选择
使用Kafka、Elasticsearch、Grafana搭建业务监控系统(二)Kafka
使用Kafka、Elasticsearch、Grafana搭建业务监控系统(三)Elasticsearch
使用Kafka、Elasticsearch、Grafana搭建业务监控系统(四)Grafana(填坑ing)

一、Elasticsearch是什么

如果之前学习过ES基础知识可以跳过,直接看spring整合ES

Elasticsearch是一个基于Apache Lucene™的开源搜索引擎。无论在开源还是专有领域,Lucene可以被认为是迄今为止最先进、性能最好的、功能最全的搜索引擎库。
但是,Lucene只是一个库。想要使用它,你必须使用Java来作为开发语言并将其直接集成到你的应用中,更糟糕的是,Lucene非常复杂,你需要深入了解检索的相关知识来理解它是如何工作的。
Elasticsearch也使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单。
不过,Elasticsearch不仅仅是Lucene和全文搜索,我们还能这样去描述它:

  • 分布式的实时文件存储,每个字段都被索引并可被搜索
  • 分布式的实时分析搜索引擎
  • 可以扩展到上百台服务器,处理PB级结构化或非结构化数据

而且,所有的这些功能被集成到一个服务里面,你的应用可以通过简单的RESTful API、各种语言的客户端甚至命令行与之交互。
上手Elasticsearch非常容易。它提供了许多合理的缺省值,并对初学者隐藏了复杂的搜索引擎理论。它开箱即用(安装即可使用),只需很少的学习既可在生产环境中使用。
Elasticsearch在Apache 2 license下许可使用,可以免费下载、使用和修改。
随着你对Elasticsearch的理解加深,你可以根据不同的问题领域定制Elasticsearch的高级特性,这一切都是可配置的,并且配置非常灵活。

简单来说,Elasticsearch就是一个基于Lucene库封装的分布式、可扩展、实时的搜索与数据分析引擎。隐藏了 Lucene 的复杂性,取而代之的提供一套简单一致的 RESTful API。

这里穿插一个Elasticsearch的小故事:

许多年前,一个刚结婚的名叫 Shay Banon 的失业开发者,跟着他的妻子去了伦敦,他的妻子在那里学习厨师。 在寻找一个赚钱的工作的时候,为了给他的妻子做一个食谱搜索引擎,他开始使用 Lucene 的一个早期版本。
直接使用 Lucene 是很难的,因此 Shay 开始做一个抽象层,Java 开发者使用它可以很简单的给他们的程序添加搜索功能。 他发布了他的第一个开源项目 Compass。
后来 Shay 获得了一份工作,主要是高性能,分布式环境下的内存数据网格。这个对于高性能,实时,分布式搜索引擎的需求尤为突出, 他决定重写 Compass,把它变为一个独立的服务并取名 Elasticsearch。
第一个公开版本在2010年2月发布,从此以后,Elasticsearch 已经成为了 Github 上最活跃的项目之一,他拥有超过300名 contributors(目前736名 contributors )。 一家公司已经开始围绕 Elasticsearch 提供商业服务,并开发新的特性,但是,Elasticsearch 将永远开源并对所有人可用。
据说,Shay 的妻子还在等着她的食谱搜索引擎…

好了,言归正传,我们先了解一下ES的几个基本概念:

集群(cluster)

一个集群就是由一个或多个节点组织在一起, 它们共同持有你全部的数据, 并一起提供索引和搜索功能。 一个集群由一个唯一的名字标识, 这个名字默认就是“elasticsearch”。 这个名字很重要, 因为一个节点只能通过指定某个集群的名字,来加入这个集群。在生产环境中显式地设定这个名字是一个好习惯,但是使用默认值来进行测试/开发也是不错的。

注意,一个集群中只包含一个节点是合法的。另外,你也可以拥有多个集群,集群以名字区分。


节点(node)

一个节点是你集群中的一个服务器,作为集群的一部分,它存储你的数据,参与集群的索引和搜索功能。 和集群类似, 一个节点也是由一个名字来标识的, 默认情况下, 这个名字是一个随机的Marvel角色的名字,这个名字会在节点启动时分配给它。这个名字对于管理工作来说很重要,因为在这个管理过程中,你会去确定网络中的哪些 服务器对应于Elasticsearch集群中的哪些节点。

一个节点可以通过配置集群名称的方式来加入一个指定的集群。 默认情况下,每个节点都会被安排加入到一个叫做“elasticsearch”的集群中,这意味着,如果你在你的网络中启动了若干个节点, 并假定它们能够相互发现彼此,它们将会自动地形成并加入到一个叫做“elasticsearch” 的集群中。

在一个集群里可以拥有任意多个节点。而且,如果当前你的网络中没有运行任何Elasticsearch节点,这时启动一个节点,会默认创建并加入一个叫做“elasticsearch”的单节点集群。


索引(index)

一个索引就是一个拥有相似特征的文档的集合。比如说,你可以有一个客户数据的索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来 标识(必须全部是小写字母的),并且当我们要对这个索引中的文档进行索引、搜索、更新和删除的时候,都要使用到这个名字。在一个集群中,你能够创建任意多个索引。


类型(type)

在一个索引中,你可以定义一种或多种类型。一个类型是你的索引的一个逻辑上的分类/分区,其语义完全由你来定。通常,会为具有一组相同字段的文档定义一个类型。比如说,我们假设你运营一个博客平台 并且将你所有的数据存储到一个索引中。在这个索引中,你可以为用户数据定义一个类型,为博客数据定义另一个类型,当然,也可以为评论数据定义另一个类型。


文档(document)

一个文档是一个可被索引的基础信息单元。比如,你可以拥有某一个客户的文档、某一个产品的一个文档、某个订单的一个文档。文档以JSON格式来表示,而JSON是一个到处存在的互联网数据交互格式。

在一个index/type里面,你可以存储任意多的文档。注意,一个文档物理上存在于一个索引之中,但文档必须被索引/赋予一个索引的type。


分片和复制(shards and replicas)

一个索引可以存储超出单个结点硬件限制的大量数据。比如,一个具有10亿文档的索引占据1TB的磁盘空间,而任一节点可能没有这样大的磁盘空间来存储或者单个节点处理搜索请求,响应会太慢。

为了解决这个问题,Elasticsearch提供了将索引划分成多片的能力,这些片叫做分片。当你创建一个索引的时候,你可以指定你想要的分片的数量。每个分片本身也是一个功能完善并且独立的“索引”,这个“索引” 可以被放置到集群中的任何节点上。

分片之所以重要,主要有两方面的原因:

  • 允许你水平分割/扩展你的内容容量
  • 允许你在分片(位于多个节点上)之上进行分布式的、并行的操作,进而提高性能/吞吐量

至于一个分片怎样分布,它的文档怎样聚合回搜索请求,是完全由Elasticsearch管理的,对于作为用户的你来说,这些都是透明的。

在一个网络/云的环境里,失败随时都可能发生。在某个分片/节点因为某些原因处于离线状态或者消失的情况下,故障转移机制是非常有用且强烈推荐的。为此, Elasticsearch允许你创建分片的一份或多份拷贝,这些拷贝叫做复制分片,或者直接叫复制。

复制之所以重要,有两个主要原因:

  • 在分片/节点失败的情况下,复制提供了高可用性。复制分片不与原/主要分片置于同一节点上是非常重要的。
  • 因为搜索可以在所有的复制上并行运行,复制可以扩展你的搜索量/吞吐量
    总之,每个索引可以被分成多个分片。一个索引也可以被复制0次(即没有复制) 或多次。一旦复制了,每个索引就有了主分片(作为复制源的分片)和复制分片(主分片的拷贝)。 分片和复制的数量可以在索引创建的时候指定。在索引创建之后,你可以在任何时候动态地改变复制的数量,但是你不能再改变分片的数量。

默认情况下,Elasticsearch中的每个索引分配5个主分片和1个复制。这意味着,如果你的集群中至少有两个节点,你的索引将会有5个主分片和另外5个复制分片(1个完全拷贝),这样每个索引总共就有10个分片。


我们用关系型数据库Mysql类比Elasticsearch,可以更好的理解ES
在这里插入图片描述

(1)关系型数据库中的数据库(DataBase),等价于ES中的索引(Index)
(2)一个数据库下面有N张表(Table),等价于1个索引Index下面有N多类型(Type),
(3)一个数据库表(Table)下的数据由多行(ROW)多列(column,属性)组成,等价于1个Type由多个文档(Document)和多Field组成。
(4)在一个关系型数据库里面,schema定义了表、每个表的字段,还有表和字段之间的关系。 与之对应的,在ES中:Mapping定义索引下的Type的字段处理规则,即索引如何建立、索引类型、是否保存原始索引JSON文档、是否压缩原始JSON文档、是否需要分词处理、如何进行分词处理等。
(5)在数据库中的增insert、删delete、改update、查search操作等价于ES中的增PUT/POST、删Delete、改_update、查GET.

关于分片和副本,我们可以从head插件直观理解(建议安装)
在这里插入图片描述
上图中20181025索引有5个主分片、2个副本,所以最终应该有5*(2+1)=15个分片
在这里插入图片描述
head插件中主分片用粗线标出,我们可以发现主分片以及副本分布在哪个节点都是完全随机,由ES管理的


ES的基本概念介绍到这里,由于ES基础知识繁多,我也是接触不久,更多的ES基础知识请参考下方链接,本文不做过多阐述。

二、spring-elasticsearch配置

pom文件配置
    <!-- elasticsearch client -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>2.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-elasticsearch</artifactId>
        <version>2.0.4.RELEASE</version>
    </dependency>

公司使用的ES版本有点老…各位可以根据实际情况选择
关于spring-data-elasticsearch与elasticsearch的对应关系官方文档是这样说明的:github地址

spring data elasticsearchelasticsearch
3.1.x6.2.2
3.0.x5.5.0
2.1.x2.4.0
2.0.x2.2.0
1.3.x1.5.2
spring-elasticsearch.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:elasticsearch="http://www.springframework.org/schema/data/elasticsearch"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/data/elasticsearch http://www.springframework.org/schema/data/elasticsearch/spring-elasticsearch.xsd
                            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <elasticsearch:transport-client id="esTransportClient"
        cluster-nodes="127.0.0.1:9300" cluster-name="elasticsearch" />

    <bean name="elasticsearchTemplate"
        class="org.springframework.data.elasticsearch.core.ElasticsearchTemplate">
        <constructor-arg ref="esTransportClient" />
    </bean>
</beans>

cluster-nodes是集群中的节点地址,如果是集群用,隔开。这里注意一下java中节点通信默认9300端口,而RESTful API默认使用9200端口通信
cluster-name是集群名称,默认值是elasticsearch

三、代码实现

ElasticsearchTemplate

ElasticsearchTemplate是Spring对ES的java api进行的封装。
常用的几个方法:

	/**
	 * Bulk index all objects. Will do save or update
	 *
	 * @param queries
	 */
	void bulkIndex(List<IndexQuery> queries);

	/**
	 * return number of elements found by given query
	 *
	 * @param query
	 * @return
	 */
	<T> long count(SearchQuery query);

	/**
	 * Create an index for given indexName
	 *
	 * @param indexName
	 */
	boolean createIndex(String indexName);

	/**
	 * Create mapping for a given indexName and type
	 *
	 * @param indexName
	 * @param type
	 * @param mappings
	 */
	boolean putMapping(String indexName, String type, Object mappings);

	/**
	 * Deletes an index for given indexName
	 *
	 * @param indexName
	 * @return
	 */
	boolean deleteIndex(String indexName);

	/**
	 * Execute the search query against elasticsearch and return result as {@link List}
	 *
	 * @param query
	 * @param clazz
	 * @param <T>
	 * @return
	 */
	<T> List<T> queryForList(SearchQuery query, Class<T> clazz);

其中对于监控业务来说,主要需要用到新增/删除索引、创建mapping、批量写入数据,而查询或者统计更多是在grafana展示曾处理。但是条件查询是spring-es中很重要的一环,我也会介绍一下最常使用的queryForList方法

Kafka数据写入ES

先看下bulkIndex的参数List<IndexQuery> queries

package org.springframework.data.elasticsearch.core.query;

/**
 * IndexQuery
 *
 * @author Rizwan Idrees
 * @author Mohsin Husen
 */

public class IndexQuery {

	private String id;
	private Object object;
	private Long version;
	private String indexName;
	private String type;
	private String source;
	private String parentId;

	//省略get set
}

ides中文档的主键,写入时不用处理
objectsoruce就是实际的文档内容,object是写入的实体类对象,sourcejson格式字符串
version是用来控制版本的,不用处理
indexName是索引名
type是类型名

构建完IndexQuery后,就可以调用方法插入数据了,具体插入代码如下:

EsIndexBO 是我构建的一个实体类,JSONObject 是alibaba的fastjson

public class EsIndexBO {

    /**
     * 索引名
     */
    private String indexName;

    /**
     * 类型
     */
    private String type;

    /**
     * 文档内容
     */
    private JSONObject source;
    
    //忽略get set
}

    public void bulkIndex(List<EsIndexBO> esIndexBOList) {

        if (CollectionUtils.isEmpty(esIndexBOList)) {
            LOGGER.warn(".bulkIndex() esIndexBOList is emtpy");
            return;
        }

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        Date date = new Date();

        List<IndexQuery> indexList = new ArrayList<>();

        int counter = 0;

        for (EsIndexBO esIndexBO : esIndexBOList) {

            IndexQuery indexQuery = new IndexQuery();

            JSONObject source = esIndexBO.getSource();
            //这个字段的用处后面会有说明
            source.put("esTimeField", sdf.format(date) + "+08:00");

            indexQuery.setSource(JSON.toJSONString(source));
            indexQuery.setIndexName(esIndexBO.getIndexName());
            indexQuery.setType(esIndexBO.getType());

            indexList.add(indexQuery);

            if (counter % 1000 == 0) {
                bulkIndexWithRetry(indexList);
                indexList.clear();
            }

            counter++;
        }

        if (indexList.size() > 0) {
            bulkIndexWithRetry(indexList);
        }

    }

    private void bulkIndexWithRetry(List<IndexQuery> indexList) {
        try {
            elasticsearchTemplate.bulkIndex(indexList);
        } catch (Exception e) {

            Random random = new Random();
            int millis = (random.nextInt(5) + 1) * 1000;

            try {
                Thread.sleep(millis);
            } catch (InterruptedException e1) {
                throw new RuntimeException(e1);
            }

            LOGGER.warn("bulkIndex failed, now retry. ExceptionMsg={}", e.getMessage());
            elasticsearchTemplate.bulkIndex(indexList);
        }
    }

每到1000就会批量插入一次,并且会有一次失败尝试的机会
索引不存在的话会自动创建,并且有默认的mapping
非常重要:关于esTimeField这个额外添加的字段,主要是用于之后grafana分析ES数据时的聚合查询,并且必须符合ES要求的时间格式yyyyMMdd'T'HHmmssZ。由于中国属于东八区,所以Z替换为+08:00。参考官方文档:built-in-date-formats

定时任务删除索引

由于线上数据量巨大,所以创建的索引是按天分割,这个从我上面head插件的图片就能看出
因为是监控数据,我不会保留太久,所以需要定期删除数据,也就是按天删除索引

配置spring定时任务

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
 http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <!-- 定时任务相关配置 -->
    <task:executor id="executor" pool-size="10"
        queue-capacity="128" />
    <task:scheduler id="scheduler" pool-size="10" />
    <task:annotation-driven executor="executor"
        scheduler="scheduler" />

</beans>

定时任务cron表达式 每天凌晨1点执行(如果是多节点需采用分布式锁或根据ip判断),删除7天前的那个索引

@Component
public class DropIndexTask {

    @Autowired
    private ElasticsearchTemplate elasticsearchTemplate;

    /**
     * log
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(DropIndexTask.class);

    @Scheduled(cron = "0 0 1 * * ?")
    public void dropNgLogIndex() {
        try {
            String deleteIndex = dateAdd(-7);
            boolean result = elasticsearchTemplate.deleteIndex(deleteIndex);
            if (!result) {
                LOGGER.warn("index:{} delete fail", deleteIndex);
            } else {
                LOGGER.info("index:{} delete success", deleteIndex);
            }
        } catch (Exception e) {
            LOGGER.error("dropIndex task error,error message:{}", e.getMessage(), e);
        }
    }

    /**
     * 获得当前日期前n天的日期
     */
    private String dateAdd(int interval) {
        // 得到当前时间n天前的日期并转换成yyyyMMdd格式
        Calendar rightNow = Calendar.getInstance();
        rightNow.setTime(new Date());
        rightNow.add(Calendar.DAY_OF_YEAR, interval);
        Date date = rightNow.getTime();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
        return sdf.format(date);
    }
}
定时任务创建索引以及mapping

为什么没有一开始就提到这个呢?是因为写入ES时没有索引会自动创建,并且下面的type会创建默认的mapping,但是在最后grafana分析的时候发现了问题,导致我们必须回过头来手动创建。

举个例子,假设创建了一个index为20181101,type为test,有一个字段packageName以及时间字段esTimeField,那么这时候通过RESTful查询http://127.0.0.1:9200/20181101/test/_mapping会得到该type的mapping:

{
  "20181101": {
    "mappings": {
      "test": {
        "properties": {
          "packageName": {
            "type": "string"
          },
          "esTimeField": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
          },
        }
      }
    }
  }
}

ES中有一个分词的概念,并且字段默认都是分词的,这样就会导致一个问题。假设packageName有2个值com.tecent.wechatcom.tecent.qq,由于默认分词,ES根据.会分成多个词语,导致最终分成了多组
所以我们需要将packageName这个字段的mapping设置为不分词,也就是not_analyzed,这样才能正确的分为2组

同时,出现这个问题后我做了相关调查,了解了ES建立index、type以及mapping的规则,并且自己做了实验,这里用通俗易懂的文字分享给大家

同一个index下先创建了template1的mapping,如下所示;这时候可以创建template2和template3,但是不可以创建template4
就是说 在一个index下只要创建好了一个type的mapping(不管是主动创建还是写入数据自动创建的),这时候可以在新的type下多出一些字段创建新的mapping规则,也可以去掉一些字段创建新的mapping规则,但是不可以修改相同位置且名字一样的字段的mapping规则。
在这里插入图片描述

总结:index下想要新创建一个type的mapping 先和该index下所有的type的mapping对比 可以多 可以少 但是不能修改相同字段(位置结构、名字都相同)的mapping规则。在一个index下,不管有多少type,相同字段(位置结构、名字都相同)的mapping只能有一个

重要:由于写入数据会自动创建索引以及type的mapping,所以我们需要提前创建,那么定时任务就需要在前一天创建好index和mapping(我是在每天的23点创建第二天的)
这里省略定时任务代码,可以参考上面删除索引的

定义存入ES的实体类,假设只有1个字段packageName

@Document(indexName = "stdomainindexmappingindex")
public class StDomainIndexMapping {

    @Field(type = FieldType.String, index = FieldIndex.not_analyzed)
    private String packageName;

}

我们看一下@Document注解

@Persistent
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Document {

	String indexName();

	String type() default "";

	boolean useServerConfiguration() default false;

	short shards() default 5;

	short replicas() default 1;

	String refreshInterval() default "1s";

	String indexStoreType() default "fs";

	boolean createIndex() default true;
}

indexName必填,其他我都采用默认值。提前说明一下,如果type没有填写,在之后的putMapping操作结束后,该index下会自动生成一个实体类类名全小写的type以及我们定义的mapping

{
  "20181101": {
    "mappings": {
      "stdomainindexmapping ": {
        "properties": {
          "packageName": {
            "type": "string",
            "index": "not_analyzed"
          },
          "esTimeField": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
          },
        }
      }
    }
  }
}

另外然后按照我前面总结的mapping规则,之后type对应的字段都会适用mapping,可能不太好理解,大家自己尝试下就懂了

再看一下@Field注解

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface Field {

	FieldType type() default FieldType.Auto;

	FieldIndex index() default FieldIndex.analyzed;

	DateFormat format() default DateFormat.none;

	String pattern() default "";

	boolean store() default false;

	String searchAnalyzer() default "";

	String analyzer() default "";

	String[] ignoreFields() default {};

	boolean includeInParent() default false;
}

对我来说需要使用的就是typeindex,将index设为FieldIndex.not_analyzed就是不分词。另外如果有嵌套对象,type需要使用FieldType.Object,嵌套对象只需要@Field,不需要@Document

到这里大家可能会有疑问,@Document需要把索引名写在注解里,我给的默认名字是stdomainindexmappingindex,而我又是按天创建索引的,如何动态修改成20181101的呢?答案是用反射处理

先放上反射工具类

    public static Object changeAnnotationValue(Annotation annotation, String key, Object newValue) {

        Object handler = Proxy.getInvocationHandler(annotation);

        Field f;

        try {
            f = handler.getClass().getDeclaredField("memberValues");
        } catch (NoSuchFieldException | SecurityException e) {
            throw new IllegalStateException(e);
        }

        f.setAccessible(true);

        Map<String, Object> memberValues;
        try {
            memberValues = (Map<String, Object>) f.get(handler);
        } catch (IllegalArgumentException | IllegalAccessException e) {
            throw new IllegalStateException(e);
        }

        Object oldValue = memberValues.get(key);
        if (null == oldValue || oldValue.getClass() != newValue.getClass()) {
            throw new IllegalArgumentException("oldValue's class and newValue's class not mapping.");
        }

        memberValues.put(key, newValue);

        return oldValue;
    }

创建index以及mapping

    public void createIndexMapping(Class<?> clazz) {
        // 我们采用的策略是当天晚上执行定时任务,提前创建第二天的索引和mapping
        String index = dateAdd(1);
        Document annotation = (Document) clazz.getAnnotation(Document.class);
        ReflectUtil.changeAnnotationValue(annotation, "indexName", index);
        insertIndexMapping(clazz);
    }

    private boolean insertIndexMapping(Class<?> clazz) {
        boolean createResult = elasticsearchTemplate.createIndex(clazz);
        if (createResult) {
            boolean putMappingResult = elasticsearchTemplate.putMapping(clazz);
        }
    }

我们在定时任务中调用该方法并传入StDomainIndexMapping.class参数就可以创建index和mapping了


本以为这样就可以了,但随之而来出现了2个严重问题:

  1. 创建mapping的时候会出现异常java.lang.NoSuchMethodError:org.springframework.core.annotation.AnnotatedElementUtils.findMergedAnnotation
    原因是项目使用的springframework版本为4.1.6,而spring-data-elasticsearch 2.1.7默认依赖的spring-context是4.3.11,所以初步确定是我们的项目使用的spring版本太低导致。
    参考spring的api文档,发现原来AnnotatedElementUtils.findMergedAnnotation是4.2版才有的
    所以我们需要修改pom文件将引用的spring版本替换为4.2以上,这也是我在Kafka篇一开始所提到的
  2. 解决了报错问题,项目正式上线,第二天的数据没有问题,但是第三天在grafana分组查询时又将词语做了分词处理。
    这是怎么回事呢?检查了es的索引mapping,发现第三天并没有主动创建,而是被动写入时创建了默认的mapping,默认分词。
    问题不应该出现在反射工具类上,如果有问题,第二天肯定也不会创建正确的mapping呀。那么问题就很可能出现在putMapping方法上,需要跟一下源码!
	@Override
	public <T> boolean putMapping(Class<T> clazz) {
		if (clazz.isAnnotationPresent(Mapping.class)) {
			String mappingPath = clazz.getAnnotation(Mapping.class).mappingPath();
			if (isNotBlank(mappingPath)) {
				String mappings = readFileFromClasspath(mappingPath);
				if (isNotBlank(mappings)) {
					return putMapping(clazz, mappings);
				}
			} else {
				logger.info("mappingPath in @Mapping has to be defined. Building mappings using @Field");
			}
		}
		ElasticsearchPersistentEntity<T> persistentEntity = getPersistentEntityFor(clazz);
		XContentBuilder xContentBuilder = null;
		try {
			xContentBuilder = buildMapping(clazz, persistentEntity.getIndexType(), persistentEntity
					.getIdProperty().getFieldName(), persistentEntity.getParentType());
		} catch (Exception e) {
			throw new ElasticsearchException("Failed to build mapping for " + clazz.getSimpleName(), e);
		}
		return putMapping(clazz, xContentBuilder);
	}

	@Override
	public <T> boolean putMapping(Class<T> clazz, Object mapping) {
		return putMapping(getPersistentEntityFor(clazz).getIndexName(), getPersistentEntityFor(clazz).getIndexType(), mapping);
	}

	@Override
	public boolean putMapping(String indexName, String type, Object mapping) {
		Assert.notNull(indexName, "No index defined for putMapping()");
		Assert.notNull(type, "No type defined for putMapping()");
		PutMappingRequestBuilder requestBuilder = client.admin().indices()
				.preparePutMapping(indexName).setType(type);
		if (mapping instanceof String) {
			requestBuilder.setSource(String.valueOf(mapping));
		} else if (mapping instanceof Map) {
			requestBuilder.setSource((Map) mapping);
		} else if (mapping instanceof XContentBuilder) {
			requestBuilder.setSource((XContentBuilder) mapping);
		}
		return requestBuilder.execute().actionGet().isAcknowledged();
	}

putMapping有3个重载方法,我们调用的是第一个,其实经过处理,我们最终还是调用的还是第三个方法。是不是中间indexName获取的时候出现了问题呢?
看一下是如何获取indexName的,getPersistentEntityFor(clazz).getIndexName(),跟进去看一下,其中有这么一个方法:

	public E getPersistentEntity(Class<?> type) {
		return getPersistentEntity(ClassTypeInformation.from(type));
	}

跟进from方法

	private static final Map<Class<?>, Reference<ClassTypeInformation<?>>> CACHE = Collections
			.synchronizedMap(new WeakHashMap<Class<?>, Reference<ClassTypeInformation<?>>>());

	static {
		for (ClassTypeInformation<?> info : Arrays.asList(COLLECTION, LIST, SET, MAP, OBJECT)) {
			CACHE.put(info.getType(), new WeakReference<ClassTypeInformation<?>>(info));
		}
	}

	private final Class<S> type;

	/**
	 * Simple factory method to easily create new instances of {@link ClassTypeInformation}.
	 * 
	 * @param <S>
	 * @param type must not be {@literal null}.
	 * @return
	 */
	public static <S> ClassTypeInformation<S> from(Class<S> type) {

		Assert.notNull(type, "Type must not be null!");

		Reference<ClassTypeInformation<?>> cachedReference = CACHE.get(type);
		TypeInformation<?> cachedTypeInfo = cachedReference == null ? null : cachedReference.get();

		if (cachedTypeInfo != null) {
			return (ClassTypeInformation<S>) cachedTypeInfo;
		}

		ClassTypeInformation<S> result = new ClassTypeInformation<S>(type);
		CACHE.put(type, new WeakReference<ClassTypeInformation<?>>(result));
		return result;
	}

原来如此,springframework包中的这个方法会把类信息缓存再Map中,导致我们之后取的都是历史值。
那么下面就很容易了,我们只需要在调用public boolean putMapping(String indexName, String type, Object mapping)方法的时候手动赋上indexName就可以了,这就需要我们手动构建一个mapping,也就是生成一个XContentBuilder ,最终代码如下

    public void createIndexMapping(Class<?> clazz) {
        // 我们采用的策略是当天晚上执行定时任务,提前创建第二天的索引和mapping
        String index = dateAdd(1);
        Document annotation = (Document) clazz.getAnnotation(Document.class);
        ReflectUtil.changeAnnotationValue(annotation, "indexName", index);
        insertIndexMapping(clazz);
    }

    private boolean insertIndexMapping(Class<?> clazz) {

        Document annotation = (Document) clazz.getAnnotation(Document.class);

        String indexName = annotation.indexName();

        ElasticsearchPersistentEntity<?> persistentEntity = elasticsearchTemplate.getPersistentEntityFor(clazz);

        XContentBuilder xContentBuilder = null;
        try {
            xContentBuilder = MappingBuilder.buildMapping(clazz, persistentEntity.getIndexType(), "",
                    persistentEntity.getParentType());
        } catch (IOException e) {
            LOGGER.error(".insertIndexMapping() IOException={}", e.getMessage(), e);
            throw new RuntimeException(e);
        }

        boolean createIndexResult = elasticsearchTemplate.createIndex(indexName);

        if (createIndexResult) {
            boolean putMappingResult = elasticsearchTemplate.putMapping(indexName, persistentEntity.getIndexType(),
                    xContentBuilder);
        }
    }

需要注意的是源代码中MappingBuilder.buildMapping方法的访问修饰符是default,所以无法访问到,需要将源代码拷贝一份到相同包下

到这里,定时创建/删除索引、创建mapping、动态索引名、写入ES数据就都介绍完了。

四、spring-elasticsearch中的查询操作

简单介绍一下spring-es中的查询,主要使用queryForList方法

	@Override
	public <T> List<T> queryForList(SearchQuery query, Class<T> clazz) {
		return queryForPage(query, clazz).getContent();
	}

ES的查询方法需要的参数SearchQuery是一个接口,有一个实现类叫NativeSearchQuery,实际使用中,我们的主要任务就是构建NativeSearchQuery来完成一些复杂的查询的。

多数情况需要使用实现类BoolQueryBuilder进行组合查询:

must

代表返回的文档必须满足must子句的条件,会参与计算分值;

filter

代表返回的文档必须满足filter子句的条件,但不会参与计算分值;

should

代表返回的文档可能满足should子句的条件,也可能不满足,有多个should时满足任何一个就可以,通过minimum_should_match设置至少满足几个。

mustnot

代表必须不满足子句的条件。

termQuerytermsQuery为精确查询,matchQuery为模糊查询,matchPhraseQuery为短语匹配,类似mysql的%xxx%

直接上代码举个例子让大家更好理解:

        // 构建QueryBuilder
        // 精确查找出开发者名为"tecent",游戏名包含"刺激战场"这个短语,精确匹配支付状态不为"UNPAID"和"PAYING",支付时间在"20181101"和"20181102"之间
        BoolQueryBuilder bqb = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("appDeveloper", "tecent"))
                .filter(QueryBuilders.matchPhraseQuery("packageName", "刺激战场"))
                .mustNot(QueryBuilders.termsQuery("payStatus", "UNPAID", "PAYING"))
                .filter(QueryBuilders.rangeQuery("PAYTIME").gte("20181101").lte("20181102"));
        // 按照支付时间降序排序
        Sort sort = new Sort(Direction.DESC, "PAYTIME");
        // JPA规定中 页码是从0开始的
        Pageable pageable = new PageRequest(0, 10, sort);
        // 构建NativeSearchQuery
        NativeSearchQuery query = new NativeSearchQueryBuilder().withIndices("20181101", "20181102").withTypes("order")
                .withFilter(bqb).withPageable(pageable).build();

        elasticsearchTemplate.queryForList(query, OrderEs.class);

这里只是简单介绍一下,更多内容请参考官方文档

五、结语

监控系统的第二步Elasticsearch已经全部结束了,下面将介绍第三步:Grafana展示

参考文档

Elasticsearch学习,请先看这一篇
elasticsearch spring 集成
spring-data-elasticsearch使用笔记
ElasticSearchRepository和ElasticSearchTemplate的使用

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐