聚合可以极其方便的实现对数据的统计、分析,例如:,我来为大家科普一下关于elasticsearch集群架构?下面希望有你要的答案,我们一起来看看吧!

elasticsearch集群架构(ElasticSearch聚合集群和SpringData)

elasticsearch集群架构

聚合 Aggregations

聚合可以极其方便的实现对数据的统计、分析,例如:

实现这些统计功能的比结构化数据库的 SQL 要方便的多,而且查询速度非常快,可以实现近实时搜索效果。

基本概念

Elasticsearch 中的聚合,包含多种类型,最常用的两种,一个叫”桶“ ,一个叫”度量“。

桶(bucket)类似于 Group By。

桶的作用,是按照某种方式对数据进行分组,每一组数据在 ES 中称为一个 桶 ,例如根据国籍对人划分,可以得到中国桶 、英国桶、日本桶等等,或者按照年龄段对人进行划分:0~10, 10~20, 20~30, 30~40 等。

Elasticsearch 中提供的划分桶的方式有很多:

综上所述,bucket aggregations 只负责对数据进行分组,并不进行计算,因此往往 bucket 中往往会嵌套另一种聚合:度量 - metrics aggregations。

度量(metrics)相当于聚合的结果。

分组完成以后,一般会对组中的数据进行聚合运算,例如求平均值、最大、最小、求和等,这些在 ES 中称为度量。

比较常用的一些度量聚合方式:

为了测试聚合,先批量导入一些数据。

创建索引:

PUT /car { "mappings": { "orders": { "properties": { "color": { "type": "keyword" }, "make": { "type": "keyword" } } } } }

注意:在 ES 中,需要进行聚合、排序、过滤的字段其处理方式比较特殊,因此不能被分词,必须使用 keyword 或数值类型 。这里将 color 和 make 这两个文字类型的字段设置为 keyword 类型,这个类型不会被分词,将来就可以参与聚合。

导入数据,这里是采用批处理的 API,可以直接复制到 Kibana 运行即可:

POST /car/orders/_bulk { "index": {}} { "price" : 10000, "color" : "红", "make" : "本田", "sold" : "2020-10-28" } { "index": {}} { "price" : 20000, "color" : "红", "make" : "本田", "sold" : "2020-11-05" } { "index": {}} { "price" : 30000, "color" : "绿", "make" : "福特", "sold" : "2020-05-18" } { "index": {}} { "price" : 15000, "color" : "蓝", "make" : "丰田", "sold" : "2020-07-02" } { "index": {}} { "price" : 12000, "color" : "绿", "make" : "丰田", "sold" : "2020-08-19" } { "index": {}} { "price" : 20000, "color" : "红", "make" : "本田", "sold" : "2020-11-05" } { "index": {}} { "price" : 80000, "color" : "红", "make" : "宝马", "sold" : "2020-01-01" } { "index": {}} { "price" : 25000, "color" : "蓝", "make" : "福特", "sold" : "2020-02-12" }

聚合为桶

首先,按照汽车的颜色 color 来划分桶,按照颜色分桶,最好是使用 TermAggregation 类型,按照颜色的名称来分桶。

GET /car/_search { "size" : 0, "aggs" : { "popular_colors" : { "terms" : { "field" : "color" } } } }

分析:

size:查询条数,这里设置为 0,因为不关心搜索到的数据,只关心聚合结果,提高效率 aggs:声明这是一个聚合查询,是 aggregations 的缩写 popular_colors:给这次聚合起一个名字,可任意指定 terms:聚合的类型,这里选择 terms,是根据词条内容(这里是颜色)划分 field:划分桶时依赖的字段

结果:

{ "took": 32, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 8, "max_score": 0, "hits": [] }, "aggregations": { "popular_colors": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "红", "doc_count": 4 }, { "key": "绿", "doc_count": 2 }, { "key": "蓝", "doc_count": 2 } ] } } }

结果分析:

hits:查询结果为空,因为设置了 size 为 0 aggregations:聚合的结果 popular_colors:定义的聚合名称 buckets:查找到的桶,每个不同的 color 字段值都会形成一个桶 key:这个桶对应的 color 字段的值 doc_count:这个桶中的文档数量

通过聚合的结果发现,目前红色的小车比较畅销。

桶内度量

前面的例子展示每个桶里面的文档数量,这很有用。 但通常,应用需要提供更复杂的文档度量。 例如,每种颜色汽车的平均价格是多少?

因此,需要告诉 Elasticsearch 使用哪个字段,使用何种度量方式进行运算,这些信息要嵌套在 桶内,度量的运算会基于桶内的文档进行。

现在,为刚刚的聚合结果添加求价格平均值的度量:

GET /car/_search { "size" : 0, "aggs" : { "popular_colors" : { "terms" : { "field" : "color" }, "aggs":{ "avg_price": { "avg": { "field": "price" } } } } } }

结果:

{ "took": 8, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 8, "max_score": 0, "hits": [] }, "aggregations": { "popular_colors": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "红", "doc_count": 4, "avg_price": { "value": 32500 } }, { "key": "绿", "doc_count": 2, "avg_price": { "value": 21000 } }, { "key": "蓝", "doc_count": 2, "avg_price": { "value": 20000 } } ] } } }

可以看到每个桶中都有自己的 avg_price 字段,这是度量聚合的结果。

Elasticsearch 集群单点的问题

单点的 Elasticsearch 存在的问题:

所以,为了应对这些问题,需要对 Elasticsearch 搭建集群。

集群的结构数据分片

首先,面临的第一个问题就是数据量太大,单点存储量有限的问题。

可以把数据拆分成多份,每一份存储到不同机器节点(node),从而实现减少每个节点数据量的目的。这就是数据的分布式存储,也叫做: 数据分片(Shard)。

完整索引库indices ----> [分片shard1, 分片shard2, 分片shard3] 创建索引,分为三个分片, 将每个分片放在不同的集群节点中,以此实现高存储。

数据备份

数据分片解决了海量数据存储的问题,但是如果出现单点故障,那么分片数据就不再完整,这又该如何解决呢?

可以给每个分片数据进行备份,存储到其它节点,防止数据丢失,这就是数据备份,也叫数据副本(replica) 。

数据备份可以保证高可用,但是每个分片备份一份,所需要的节点数量就会翻一倍,成本过高。

为了在高可用和成本间寻求平衡:

这样可以大大减少所需要的服务节点数量。

以 3 分片,每个分片备份一份为例:

node-01 : 0, 2 node-02 : 0, 1 node-03 : 1, 2 ​ 集群有三个节点,分别是 node-01、node-02、node-03; 新建索引 renda,指定分片为 3,副本为 1,三个主数据,三个副本; 三个分片为 0,1,2。 ​ 0 对应 node-01, 1 对应 node-02, 2 对应 node-03,

在这个集群中,如果出现单节点故障,并不会导致数据缺失,所以保证了集群的高可用,同时也减少了节点中数据存储量。并且因为是多个节点存储数据,因此用户请求也会分发到不同服务器,并发能力也得到了一定的提升。

搭建集群

集群需要多台机器,这里用一台机器来模拟,因此需要在一台虚拟机中部署多个 Elasticsearch 节点,每个 Elasticsearch 的端口都必须不一样。

一台机器进行模拟:将 ES 的安装包复制三份,修改端口号,data 和 log 存放位置的不同。

实际开发中:将每个 ES 节点放在不同的服务器上。

集群名称为:renda-elastic,部署 3 个 elasticsearch 节点,分别是:

http:表示使用 http 协议进行访问时使用端口,elasticsearch-head、kibana、postman,默认端口号是 9200。

tcp:集群间的各个节点进行通讯的端口,默认 9300。

第一步:复制 es 软件粘贴 3 次,分别改名。

第二步:修改每一个节点的配置文件 config 下的 elasticsearch.yml,下面以第一份配置文件为例。

三个节点的配置文件几乎一致,除了:node.name、path.data、path.logs、http.port、transport.tcp.port。

node-01:

# 允许跨域名访问 http.cors.enabled: true # 当设置允许跨域,默认为*,表示支持所有域名 http.cors.allow-origin: "*" # 允许所有节点访问 network.host: 0.0.0.0 # 集群的名称,同一个集群下所有节点的集群名称应该一致 cluster.name: renda-elastic # 当前节点名称 每个节点不一样 node.name: node-01 # 数据的存放路径 每个节点不一样,不同 es 服务器对应的 data 和 log 存储的路径不能一样 path.data: e:\class\es-9201\data # 日志的存放路径 每个节点不一样 path.logs: e:\class\es-9201\logs # http协议的对外端口 每个节点不一样,默认:9200 http.port: 9201 # TCP协议对外端口 每个节点不一样,默认:9300 transport.tcp.port: 9301 # 三个节点相互发现,包含自己,使用 tcp 协议的端口号 discovery.zen.ping.unicast.hosts: ["127.0.0.1:9301","127.0.0.1:9302","127.0.0.1:9303"] # 声明大于几个的投票主节点有效,请设置为(nodes / 2) 1 discovery.zen.minimum_master_nodes: 2 # 是否为主节点 node.master: true

node-02:

# 允许跨域名访问 http.cors.enabled: true http.cors.allow-origin: "*" network.host: 0.0.0.0 # 集群的名称 cluster.name: renda-elastic # 当前节点名称 每个节点不一样 node.name: node-02 # 数据的存放路径 每个节点不一样 path.data: e:\class\es-9202\data # 日志的存放路径 每个节点不一样 path.logs: e:\class\es-9202\logs # http 协议的对外端口 每个节点不一样 http.port: 9202 # TCP 协议对外端口 每个节点不一样 transport.tcp.port: 9302 # 三个节点相互发现 discovery.zen.ping.unicast.hosts: ["127.0.0.1:9301","127.0.0.1:9302","127.0.0.1:9303"] # 声明大于几个的投票主节点有效,请设置为(nodes / 2) 1 discovery.zen.minimum_master_nodes: 2 # 是否为主节点 node.master: true

node-03:

# 允许跨域名访问 http.cors.enabled: true http.cors.allow-origin: "*" network.host: 0.0.0.0 # 集群的名称 cluster.name: renda-elastic # 当前节点名称 每个节点不一样 node.name: node-03 # 数据的存放路径 每个节点不一样 path.data: e:\class\es-9203\data # 日志的存放路径 每个节点不一样 path.logs: e:\class\es-9203\logs # http协议的对外端口 每个节点不一样 http.port: 9203 # TCP协议对外端口 每个节点不一样 transport.tcp.port: 9303 # 三个节点相互发现 discovery.zen.ping.unicast.hosts: ["127.0.0.1:9301","127.0.0.1:9302","127.0.0.1:9303"] # 声明大于几个的投票主节点有效,请设置为(nodes / 2) 1 discovery.zen.minimum_master_nodes: 2 # 是否为主节点 node.master: true

第三步:启动集群

把三个节点分别启动,要确保一个一个地启动。

Chrome 浏览器使用 Head 插件查看节点启动状态,connect http://localhost:9201/。

测试集群中创建索引库

配置 kibana.yml:

# 端口号改为 9201 或 9202 或 9203 都可以 elasticsearch.url: "http://localhost:9201"

再重启 Kibana。

搭建集群以后就要创建索引库了,那么问题来了,当创建一个索引库后,数据会保存到哪个服务节点上呢?如果对索引库分片,那么每个片会在哪个节点呢?

使用 ElasticSearch-Head 创建新的 Index:名称为 renda,分片数为 3,副本为 1。

对比创建索引库的 API 示例:

PUT /renda { "settings": { "number_of_shards": 3, "number_of_replicas": 1 } }

这里有两个配置:

通过 chrome 浏览器的 head 插件查看,可以查看到分片的存储结构。

可以看到,renda 这个索引库,有三个分片,分别是 0、1、2,每个分片有 1 个副本,共 6 份。

集群工作原理Shard 与 Replica 机制

1)一个 index 包含多个 shard,也就是一个 index 存在多个服务器上。

2)每个 shard 都是一个最小工作单元,承载部分数据,比如有三台服务器,现在有三条数据,这三条数据在三台服务器上各方一条。

3)增减节点时,shard 会自动在 nodes 中负载均衡。

4)primary shard(主分片)和 replica shard(副本分片),每个 document 肯定只存在于某一个 primary shard 以及其对应的 replica shard 中,不可能存在于多个 primary shard。

5)replica shard 是 primary shard 的副本,负责容错,以及承担读请求负载。

6)primary shard 的数量在创建索引的时候就固定了,replica shard 的数量可以随时修改。

7)primary shard 的默认数量是 5,replica 默认是 1(每个主分片一个副本分片),默认有 10 个 shard,5 个 primary shard,5 个 replica shard。

8)primary shard 不能和自己的 replica shard 放在同一个节点上(否则节点宕机,primary shard 和副本都丢失,起不到容错的作用),但是可以和其他 primary shard 的 replica shard 放在同一个节点上。

集群写入数据
  1. 客户端选择一个 node 发送请求过去,这个 node 就是 coordinating node (协调节点)。
  2. Coordinating node,对document进行路由,将请求转发给对应的node(根据一定的算法选择对应的节点进行存储)。
  3. 实际上的 node 上的 primary shard 处理请求,将数据保存在本地,然后将数据同步到 replica node。
  4. Coordinating node,如果发现 primary node 和所有的 replica node 都搞定之后,就会返回请求到客户端。

这个路由简单的说就是取模算法,比如说现在有 3 台服务器,这个时候传过来的 id 是 5,那么 5 % 3 = 2,就放在第 2 台服务器。

ES 查询数据倒排序算法

倒排序算法:通过分词把词语出现的 id 进行记录下来,再查询的时候先去查到哪些 id 包含这个数据,然后再根据 id 把数据查出来。

查询过程
  1. 客户端发送一个请求给 coordinate node 协调节点。
  2. 协调节点将搜索的请求转发给所有的 shard 对应的 primary shard 或 replica shard。
  3. Query phase(查询阶段),每一个 shard 将自己搜索的结果(其实也就是一些唯一标识),返回给协调节点,由协调节点进行数据的合并,排序,分页等操作,产出最后的结果。
  4. Fetch phase(获取阶段),接着由协调节点,根据唯一标识去各个节点进行拉取数据,最终返回给客户端。
Elasticsearch 客户端客户端介绍

在elasticsearch官网中提供了各种语言的客户端:https://www.elastic.co/guide/en/elasticsearch/client/index.html

注意选择版本为 6.2.4 ,与之前的版本保持一致。

创建 Demo 工程初始化项目

使用 Spring Initializr 初始化项目 elasticsearch-demo --> 选择 Developer Tools 的 Spring Boot DevTools、Lombok,Web 的 Spring Web。

POM 文件

注意,这里直接导入了 SpringBoot 的启动器,方便后续整合 Spring Data Elasticsearch,不过还需要手动引入 Elasticsearch 的 High-level-Rest-Client 的依赖。

另外还要注意确保 spring boot 版本号与 es client 相对应,否则运行时会报创建 elasticsearchRestHighLevelClient 的错误;如果出现了这种错误,就需要 Maven clean 一下项目,然后确保版本号正确后再重新运行。

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.renda</groupId> <artifactId>elasticsearch-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>elasticsearch-demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.8.1</version> </dependency> <!-- Apache 开源组织提供的用于操作 JAVA BEAN 的工具包 --> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.9.1</version> </dependency> <!-- ES 高级 Rest Client --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.4.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

配置文件

在 resource 下创建 application.yml。

索引库及映射

创建索引库的同时,也会创建 type 及其映射关系,但是这些操作不建议使用 java 客户端完成,原因如下:

因此,这些操作建议还是使用 Rest 风格 API 去实现。

以一个商品数据为例来创建索引库:

com.renda.pojo.Product

@Data public class Product { ​ private Long id; ​ private String title; // 标题 ​ private String category; // 分类 ​ private String brand; // 品牌 ​ private Double price; // 价格 ​ private String images; // 图片地址 ​ }

分析一下数据结构:

可以编写这样的映射配置:

PUT /renda { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "item": { "properties": { "id": { "type": "keyword" }, "title": { "type": "text", "analyzer": "ik_max_word" }, "category": { "type": "keyword" }, "brand": { "type": "keyword" }, "images": { "type": "keyword", "index": false }, "price": { "type": "double" } } } } }

索引数据操作

有了索引库,接下来看看如何新增索引数据。

操作 MySQL 数据库:

初始化客户端

完成任何操作都需要通过 HighLevelRestClient 客户端。

编写一个测试类:

com.renda.ElasticsearchDemoApplicationTests

@SpringBootTest @RunWith(SpringRunner.class) class ElasticsearchDemoApplicationTests { private RestHighLevelClient restHighLevelClient; /** * 初始化客户端 */ @Before public void init() { RestClientBuilder restClientBuilder = RestClient.builder( new HttpHost("127.0.0.1", 9201, "http"), new HttpHost("127.0.0.1", 9202, "http"), new HttpHost("127.0.0.1", 9203, "http") ); restHighLevelClient = new RestHighLevelClient(restClientBuilder); } /** * 关闭客户端 */ @After public void close() throws IOException { restHighLevelClient.close(); } }

新增文档

示例:

com.renda.ElasticsearchDemoApplicationTests

package com.renda; import com.google.gson.Gson; import com.renda.pojo.Product; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.io.IOException; @RunWith(SpringRunner.class) @SpringBootTest public class ElasticsearchDemoApplicationTests { private RestHighLevelClient restHighLevelClient; private Gson gson = new Gson(); ... /** * 插入文档 */ @Test public void testInsert() throws IOException { // 1.文档数据 Product product = new Product(); product.setBrand("华为"); product.setCategory("手机"); product.setId(1L); product.setImages("http://image.huawei.com/1.jpg"); product.setPrice(5999.99); product.setTitle("华为P30"); // 2.将文档数据转换为 json 格式 String source = gson.toJson(product); // 3.创建索引请求对象 访问哪个索引库、哪个 type、指定文档 ID // public IndexRequest(String index, String type, String id) IndexRequest request = new IndexRequest("renda", "item", product.getId().toString()); request.source(source, XContentType.JSON); // 4.发出请求 IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT); System.out.println(response); } }

看下响应:

IndexResponse[ index=renda, type=item, id=1, version=2, result=updated, seqNo=1, primaryTerm=1, shards={ "total":2, "successful":2, "failed":0 } ]

查看文档

根据 Rest 风格,查看应该是根据 id 进行 get 查询,难点是对结果的解析:

... /** * 查看文档 */ @Test public void testView() throws IOException { // 初始化 GetRequest 对象 GetRequest getRequest = new GetRequest("renda", "item", "1"); // 执行查询 GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); // 取出数据 String source = getResponse.getSourceAsString(); Product product = gson.fromJson(source, Product.class); System.out.println(product); } ...

结果:

Product( id=1, title=华为P30, category=手机, brand=华为, price=5999.99, images=http://image.huawei.com/1.jpg )

修改文档

新增时,如果传递的 id 是已经存在的,则会完成修改操作,如果不存在,则是新增。

删除文档

根据 id 删除:

/** * 删除文档 */ @Test public void testDelete() throws IOException { // 初始化 DeleteRequest 对象 DeleteRequest request = new DeleteRequest("renda", "item", "1"); // 执行删除 DeleteResponse response = restHighLevelClient.delete(request, RequestOptions.DEFAULT); System.out.println(response); }

结果:

DeleteResponse[ index=renda, type=item, id=1, version=3, result=deleted, shards=ShardInfo { total=2, successful=2, failures=[] } ]

搜索数据查询所有 match_all

/** * 可重用代码 */ public void baseQuery(SearchSourceBuilder sourceBuilder) throws IOException { // 创建搜索请求对象 SearchRequest request = new SearchRequest(); // 查询构建工具 request.source(sourceBuilder); // 执行查询 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 获得查询结果 SearchHits hits = response.getHits(); // 获得文件数组 SearchHit[] hitsHits = hits.getHits(); for(SearchHit searchHit: hitsHits){ String json = searchHit.getSourceAsString(); // 将 json 反序列化为 Product 格式 Product product = gson.fromJson(json, Product.class); System.out.println(product); } } ​ /** * 查看所有文档 */ @Test public void matchAll() throws IOException { // 查询构建工具 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 添加查询条件,执行查询类型 sourceBuilder.query(QueryBuilders.matchAllQuery()); // 调用基础查询方法 baseQuery(sourceBuilder); }

结果示例:

item = Item{id=5, title='荣耀V10', category='手机', brand='华为', price=2799.0, images='http://image.renda.com/13123.jpg'} item = Item{id=2, title='坚果手机R1', category='手机', brand='锤子', price=3699.0, images='http://image.renda.com/13123.jpg'} item = Item{id=4, title='小米Mix2S', category='手机', brand='小米', price=4299.0, images='http://image.renda.com/13123.jpg'} item = Item{id=1, title='小米手机7', category='手机', brand='小米', price=3299.0, images='http://image.renda.com/13123.jpg'} item = Item{id=3, title='华为META10', category='手机', brand='华为', price=4499.0, images='http://image.renda.com/13123.jpg'}

注意,上面的代码中,搜索条件是通过 sourceBuilder.query(QueryBuilders.matchAllQuery()) 来添加的。这个 query() 方法接受的参数是: QueryBuilder 接口类型。

这个接口提供了很多实现类,分别对应不同类型的查询,例如:term 查询、match 查询、range 查询、boolean 查询等。

因此,如果要使用各种不同查询,其实仅仅是传递给 sourceBuilder.query() 方法的参数不同而已。而这些实现类不需要去 new ,官方提供了 QueryBuilders 工厂帮构建各种实现类。

关键字搜索 match

搜索类型的变化,仅仅是利用 QueryBuilders 构建的查询对象不同而已,其他代码基本一致:

@Test public void matchQuery() throws IOException { SearchSourceBuilder builder = new SearchSourceBuilder(); // 设置查询类型和查询条件 builder.query(QueryBuilders.matchQuery("title", "手机")); // 调用基础查询方法 baseQuery(builder); }

结果示例:

item = Item{id=2, title='坚果手机R1', category='手机', brand='锤子', price=3699.0, images='http://image.renda.com/13123.jpg'} item = Item{id=1, title='小米手机7', category='手机', brand='小米', price=3299.0, images='http://image.renda.com/13123.jpg'}

范围查询 range

RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("price");

支持下面的范围关键字:

示例:

@Test public void rangeQuery() throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 执行查询条件和查询类型 RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("price"); rangeQueryBuilder.gte(3600); rangeQueryBuilder.lte(8300); sourceBuilder.query(rangeQueryBuilder); baseQuery(sourceBuilder); }

结果:

item = Item{id=5, title='荣耀V10', category='手机', brand='华为', price=2799.0, images='http://image.renda.com/13123.jpg'} item = Item{id=2, title='坚果手机R1', category='手机', brand='锤子', price=3699.0, images='http://image.renda.com/13123.jpg'} item = Item{id=1, title='小米手机7', category='手机', brand='小米', price=3299.0, images='http://image.renda.com/13123.jpg'}

source 过滤

_source:存储原始文档。

默认情况下,索引库中所有数据都会返回,如果想只返回部分字段,可以通过 source filter 来控制。

@Test public void sourceFilter() throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 执行查询条件和查询类型 RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("price"); rangeQueryBuilder.gte(3600); rangeQueryBuilder.lte(4300); sourceBuilder.query(rangeQueryBuilder); // source 过滤,只保留 id、title、price sourceBuilder.fetchSource(new String[]{"id", "title", "price"}, null); baseQuery(sourceBuilder); }

结果:

item = Item{id=5, title='荣耀V10', category='null', brand='null', price=2799.0, images='null'} item = Item{id=2, title='坚果手机R1', category='null', brand='null', price=3699.0, images='null'} item = Item{id=4, title='小米Mix2S', category='null', brand='null', price=4299.0, images='null'} item = Item{id=1, title='小米手机7', category='null', brand='null', price=3299.0, images='null'} item = Item{id=3, title='华为META10', category='null', brand='null', price=4499.0, images='null'}

排序

依然是通过 sourceBuilder 来配置:

@Test public void sortAndPage() throws IOException { // 创建搜索请求对象 SearchRequest request = new SearchRequest(); // 查询构建工具 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 添加查询条件,执行查询类型 sourceBuilder.query(QueryBuilders.matchAllQuery()); // 执行排序 价格降序排序 sourceBuilder.sort("price", SortOrder.DESC); ​ baseQuery(sourceBuilder); }

结果:

item = Item{id=5, title='荣耀V10', category='手机', brand='华为', price=2799.0, images='http://image.renda.com/13123.jpg'} item = Item{id=1, title='小米手机7', category='手机', brand='小米', price=3299.0, images='http://image.renda.com/13123.jpg'} item = Item{id=2, title='坚果手机R1', category='手机', brand='锤子', price=3699.0, images='http://image.renda.com/13123.jpg'} item = Item{id=4, title='小米Mix2S', category='手机', brand='小米', price=4299.0, images='http://image.renda.com/13123.jpg'} item = Item{id=3, title='华为META10', category='手机', brand='华为', price=4499.0, images='http://image.renda.com/13123.jpg'}

分页

分页需要视图层传递两个参数:

而 elasticsearch 中需要的不是当前页,而是起始位置,有公式可以计算出:

代码:

@Test public void sortAndPage() throws IOException { // 创建搜索请求对象 SearchRequest request = new SearchRequest(); // 查询构建工具 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 添加查询条件,执行查询类型 sourceBuilder.query(QueryBuilders.matchAllQuery()); // 执行排序 价格降序排序 sourceBuilder.sort("price", SortOrder.DESC); // 分页信息 int currentPage = 1; int pageSize = 3; int startPos = (currentPage - 1) * pageSize; //设置分页 sourceBuilder.from(startPos); sourceBuilder.size(3); baseQuery(sourceBuilder); }

结果:

item = Item{id=5, title='荣耀V10', category='手机', brand='华为', price=2799.0, images='http://image.renda.com/13123.jpg'} item = Item{id=1, title='小米手机7', category='手机', brand='小米', price=3299.0, images='http://image.renda.com/13123.jpg'} item = Item{id=2, title='坚果手机R1', category='手机', brand='锤子', price=3699.0, images='http://image.renda.com/13123.jpg'}

当 currentPage 为 2 的时候,结果是:

item = Item{id=4, title='小米Mix2S', category='手机', brand='小米', price=4299.0, images='http://image.renda.com/13123.jpg'} item = Item{id=3, title='华为META10', category='手机', brand='华为', price=4499.0, images='http://image.renda.com/13123.jpg'}

Spring Data Elasticsearch什么是 Spring Data Elasticsearch

Spring Data Elasticsearch - SDE 是 Spring Data 项目下的一个子模块。

Spring Data 的使命是给各种数据访问提供统一的编程接口,不管是关系型数据库(如 MySQL),还是非关系数据库(如 Redis),或者类似 Elasticsearch 这样的索引数据库;从而简化开发人员的代码,提高开发效率。

Spring Data Elasticsearch 的页面:https://projects.spring.io/spring-data-elasticsearch/

特征:

配置 Spring Data Elasticsearch

在 pom 文件中,引入 Spring Data Elasticsearch 的启动器:

<!-- Spring data elasticsearch --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>

然后,只需要在 resources 下的 application.yml 文件,引入 Elasticsearch 的 host 和 port 即可:

spring: data: elasticsearch: cluster-name: renda-elastic cluster-nodes: 127.0.0.1:9301,127.0.0.1:9302,127.0.0.1:9303

需要注意的是,Spring Data Elasticsearch 底层使用的不是 Elasticsearch 提供的 RestHighLevelClient,而是 TransportClient,并不采用 Http 协议通信,而是访问 Elasticsearch 对外开放的 tcp 端口,在之前集群配置中,设置的分别是:9301,9302,9303

确保引导类如下:

@SpringBootApplication public class ElasticsearchDemoApplication { ​ public static void main(String[] args) { SpringApplication.run(ElasticsearchDemoApplication.class, args); } ​ }

另外,SpringBoot 已经配置好了各种 SDE 配置,并且注册了一个 ElasticsearchTemplate 供使用。

索引库操作创建索引库

Pojo 对象:

@Data public class Product { ​ private Long id; ​ private String title; // 标题 ​ private String category; // 分类 ​ private String brand; // 品牌 ​ private Double price; // 价格 ​ private String images; // 图片地址 ​ }

创建一个测试类,然后注入 ElasticsearchTemplate:

@RunWith(SpringRunner.class) @SpringBootTest public class ElasticsearchSpringDataTests { @Autowired private ElasticsearchTemplate template; }

创建索引库的 API 示例:

@RunWith(SpringRunner.class) @SpringBootTest public class ElasticsearchSpringDataTests { @Autowired private ElasticsearchTemplate template; @Test public void createIndex() { template.createIndex(Product.class); } }

运行测试方法,发现报错:Product is not a Document;因为创建索引库需要指定的信息,比如:索引库名、类型名、分片、副本数量、映射信息都没有填写。

自定义工具类类似,SDE 也是通过实体类上的注解来配置索引库信息的,需要在 Product 上添加下面的一些注解:

@Data @NoArgsConstructor @AllArgsConstructor @Document(indexName = "renda", type = "product", shards = 3, replicas = 1) public class Product { @Id private Long id; @Field(type = FieldType.Text, analyzer = "ik_max_word") private String title; // 标题 @Field(type = FieldType.Keyword) private String category; // 分类 @Field(type = FieldType.Keyword) private String brand; // 品牌 @Field(type = FieldType.Double) private Double price; // 价格 @Field(type = FieldType.Keyword, index = false) private String images; // 图片地址 }

@Document:声明索引库配置

@Id:声明实体类的 id @Field:声明字段属性

创建映射

刚才的注解已经把映射关系也配置上了,所以创建映射只需要这样:

@Test public void createType() { template.putMapping(Product.class); }

索引数据 CRUD

SDE 的索引数据 CRUD 并没有封装在 ElasticsearchTemplate 中,而是有一个叫做 ElasticsearchRepository 的接口。

需要自定义接口,继承 ElasticsearchRespository:

com.renda.repository.ProductRepository

package com.renda.repository; import com.renda.pojo.Product; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; /** * 当 SDE 访问索引库时, * 需要定义一个持久层的接口去继承 ElasticsearchRepository 接口即可, * 无需实现 * * @author Renda Zhang * @since 2020-11-10 23:00 */ public interface ProductRepository extends ElasticsearchRepository<Product, Long> { }

创建索引数据

创建索引有单个创建和批量创建之分。

单个创建:

@Test public void insertDocument() { Product product = new Product(6L, "小米手机", "手机", "锤子", 3299.99, "http://image.renda.com/1.jpg"); productRepository.save(product); System.out.println("Successfully Saved"); }

批量创建:

@Test public void insertDocuments() { Product product1 = new Product(2L, "坚果手机", "手机", "phone", 3299.99, "http://image.renda.com/1.jpg"); Product product2 = new Product(3L, "华为手机", "手机", "phone", 3299.99, "http://image.renda.com/1.jpg"); Product product3 = new Product(4L, "苹果手机", "手机", "phone", 3299.99, "http://image.renda.com/1.jpg"); Product product4 = new Product(5L, "索尼手机", "手机", "phone", 3299.99, "http://image.renda.com/1.jpg"); List<Product> list = new ArrayList<>(); list.add(product1); list.add(product2); list.add(product3); list.add(product4); productRepository.saveAll(list); System.out.println("Successfully Saved All"); }

查询索引数据

默认提供了根据 id 查询,查询所有两个功能。

根据 id 查询:

@Test public void findById() { Optional<Product> optional = productRepository.findById(3L); // orElse 方法的作用:如果 optional 中封装的实体对象为空也就是没有从索引库中查询出匹配的文档,返回 orElse 方法的参数 Product product = optional.orElse(null); System.out.println(product); }

结果:

Product(id=3, title=华为手机, category=手机, brand=phone, price=3299.99, images=http://image.renda.com/1.jpg)

查询所有:

@Test public void findAll() { productRepository.findAll().forEach(System.out::println); }

结果:

Product(id=6, title=小米手机, category=手机, brand=锤子, price=3299.99, images=http://image.renda.com/1.jpg) Product(id=2, title=坚果手机, category=手机, brand=phone, price=3299.99, images=http://image.renda.com/1.jpg) Product(id=4, title=苹果手机, category=手机, brand=phone, price=3299.99, images=http://image.renda.com/1.jpg) Product(id=5, title=索尼手机, category=手机, brand=phone, price=3299.99, images=http://image.renda.com/1.jpg) Product(id=3, title=华为手机, category=手机, brand=phone, price=3299.99, images=http://image.renda.com/1.jpg)

自定义方法查询

ProductRepository 提供的查询方法有限,但是它却提供了非常强大的自定义查询功能。

只要遵循 Spring Data 提供的语法,可以任意定义方法声明:

com.renda.repository.ProductRepository

public interface ProductRepository extends ElasticsearchRepository<Product, Long> { /** * 查询价格范围 */ List<Product> findByPriceBetween(Double from, Double to); }

无需写实现,SDE 会自动实现该方法,直接用即可:

@Test public void findByPrice() { List<Product> list = productRepository.findByPriceBetween(2000.00, 4000.00); System.out.println(list.size()); }

支持的一些语法示例:

And findByNameAndPrice {"bool" : {"must" : [ {"field" : {"name" : "?"}}, {"field" : {"price" : "?"}} ]}} ​ Or findByNameOrPrice {"bool" : {"should" : [{"field" : {"name" : "?"}}, {"field" : {"price" : "?"}}]}} ​ Is findByName {"bool" : {"must" : {"field" :{"name" : "?"}}}} ​ Not findByNameNot {"bool" : {"must_not" :{"field" : {"name" : "?"}}}} ​ Between findByPriceBetween {"bool" : {"must" : {"range" : {"price" : {"from" : ?, "to" : ?, "include_lower" : true, "include_upper" : true}}}}} ​ LessThanEqual findByPriceLessThan {"bool" : {"must" : {"range" :{"price" : {"from" : null, "to" : ?, "include_lower" : true, "include_upper" : true}}}}} ​ GreaterThanEqual findByPriceGreaterThan {"bool" : {"must" : {"range" : {"price" : {"from" : ?, "to" : null, "include_lower" : true, "include_upper" : true}}}}} ​ Before findByPriceBefore {"bool" : {"must" : {"range" : {"price" : {"from" : null, "to" : ?, "include_lower" : true, "include_upper" : true}}}}} ​ After findByPriceAfter {"bool" : {"must" : {"range" : {"price" : {"from" : ?, "to" : null, "include_lower" : true, "include_upper" : true}}}}} ​ Like findByNameLike {"bool" : {"must" : {"field" : {"name" : {"query" : "?*", "analyze_wildcard" : true}}}}} ​ StartingWith findByNameStartingWith {"bool" : {"must" : {"field" : {"name" : {"query" : "?*", "analyze_wildcard" : true}}}}} ​ EndingWith findByNameEndingWith {"bool" : {"must" : {"field" : {"name" : {"query" : "*?", "analyze_wildcard" : true}}}}} ​ Contains/Containing findByNameContaining {"bool" : {"must" : {"field" : {"name" : {"query" : "**?**", "analyze_wildcard" : true}}}}} ​ In findByNameIn(Collection<String>names) {"bool" : {"must" : {"bool" : {"should" : [ {"field" : {"name" : "?"}}, {"field" : {"name" : "?"}} ]}}}} ​ NotIn findByNameNotIn(Collection<String>names) {"bool" : {"must_not" : {"bool" : {"should" : {"field" : {"name" : "?"}}}}}} ​ Near findByStoreNear Not Supported Yet ! ​ True findByAvailableTrue {"bool" : {"must" : {"field" : {"available" : true}}}} ​ False findByAvailableFalse {"bool" : {"must" : {"field" : {"available" : false}}}} ​ OrderBy findByAvailableTrueOrderByNameDesc {"sort" : [{ "name" : {"order" : "desc"} }],"bool" : {"must" : {"field" : {"available" : true}}}}

原生查询

如果上述接口依然不符合需求,SDE 也支持原生查询,这个时候还是使用 ElasticsearchTemplate。

而查询条件的构建是通过一个名为 NativeSearchQueryBuilder 的类来完成的,不过这个类的底层还是使用 ES 的原生 API 中的 QueryBuilders 、 AggregationBuilders 、 HighlightBuilders 等工具。

需求:查询 title 中包含小米手机的商品,以价格升序排序,分页查询:每页展示 2 条,查询第 1 页;对查询结果进行聚合分析:获取品牌及个数。

示例:

@Test public void nativeQuery() { // 1.构架一个原生查询器 NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder(); // 2.source 过滤 // 2.1 参数:final String[] includes, final String[] excludes // 如果不想执行 source 过滤可以将该行注释 queryBuilder.withSourceFilter(new FetchSourceFilter(new String[0], new String[0])); // 3.查询条件 queryBuilder.withQuery(QueryBuilders.matchQuery("title", "小米手机")); // 4.设置分页和排序规则 queryBuilder.withPageable(PageRequest.of(0, 10, Sort.by(Sort.Direction.ASC, "price"))); // 5.高亮 // ... // 6.聚合 queryBuilder.addAggregation(AggregationBuilders.terms("brandAgg").field("brand")); // 7.查询 AggregatedPage<Product> result = template.queryForPage(queryBuilder.build(), Product.class); ​ // 8. 解析结果 // 获取分页结果 long total = result.getTotalElements(); int totalPages = result.getTotalPages(); List<Product> content = result.getContent(); System.out.println(total " " totalPages); content.forEach(System.out::println); // 获取聚合结果 Aggregations resultAggregations = result.getAggregations(); Terms terms = resultAggregations.get("brandAgg"); terms.getBuckets().forEach(bucket -> { System.out.println("品牌:" bucket.getKeyAsString()); System.out.println("数量:" bucket.getDocCount()); }); }

上述查询没有实现高亮结果,以下实现高亮展示。

1、首先,自定义搜索结果映射:

com.renda.resultMapper.ESSearchResultMapper

package com.renda.resultMapper; ​ import com.google.gson.Gson; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.springframework.data.domain.Pageable; import org.springframework.data.elasticsearch.core.SearchResultMapper; import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage; import org.springframework.data.elasticsearch.core.aggregation.impl.AggregatedPageImpl; ​ import java.util.ArrayList; import java.util.Map; ​ /** * 自定义结果映射,处理高亮 * * @author Renda Zhang * @since 2020-11-10 23:50 */ public class ESSearchResultMapper implements SearchResultMapper { ​ /** * 完成结果映射 * 操作的重点应该是将原有的结果:_source 取出来,放入高亮的数据 * * @return AggregatedPage 需要三个参数进行构建:pageable, List<Product>, 总记录数 */ @Override public <T> AggregatedPage<T> mapResults(SearchResponse searchResponse, Class<T> aClass, Pageable pageable) { // 获得总记录数 SearchHits searchHits = searchResponse.getHits(); if (searchHits.getHits().length <= 0) { return null; } ​ // 记录列表 ArrayList<T> list = new ArrayList<>(); // 获取原始的搜索结果 for (SearchHit hit : searchHits) { // 获取 _source 属性中的所有数据 Map<String, Object> map = hit.getSourceAsMap(); // 获得高亮的字段 Map<String, HighlightField> highlightFields = hit.getHighlightFields(); // 每个高亮字段都需要进行设置 for (Map.Entry<String, HighlightField> highlightField : highlightFields.entrySet()) { // 获得高亮的 key:高亮字段 String key = highlightField.getKey(); // 获得 value:高亮之后的效果 HighlightField value = highlightField.getValue(); // 将高亮字段和文本效果放入到 map 中,覆盖对应数据 map.put(key, value.getFragments()[0].toString()); } // 将 map 转换为对象 // map --> jsonString --> 对象 Gson gson = new Gson(); T t = gson.fromJson(gson.toJson(map), aClass); list.add(t); } ​ // 返回 return new AggregatedPageImpl<>(list, pageable, searchHits.getTotalHits()); } }

2、高亮实现:

@Test public void nativeQuery() { // 1.构架一个原生查询器 NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder(); // 2.source 过滤 // 2.1 参数:final String[] includes, final String[] excludes // 如果不想执行 source 过滤可以将该行注释 queryBuilder.withSourceFilter(new FetchSourceFilter(new String[0], new String[0])); // 3.查询条件 queryBuilder.withQuery(QueryBuilders.matchQuery("title", "小米手机")); // 4.设置分页和排序规则 queryBuilder.withPageable(PageRequest.of(0, 10, Sort.by(Sort.Direction.DESC, "price"))); // 5.高亮 HighlightBuilder.Field field = new HighlightBuilder.Field("title"); field.preTags("<font style='color:red'>"); field.postTags("</font>"); queryBuilder.withHighlightFields(field); // 6.聚合 queryBuilder.addAggregation(AggregationBuilders.terms("brandAgg").field("brand")); // 7.查询 AggregatedPage<Product> result = template.queryForPage(queryBuilder.build(), Product.class, new ESSearchResultMapper()); ​ // 8. 解析结果 // 获取分页结果 long total = result.getTotalElements(); int totalPages = result.getTotalPages(); List<Product> content = result.getContent(); System.out.println(total " " totalPages); content.forEach(System.out::println); }

想了解更多,欢迎关注我的Renda_Zhang

,