当前位置:脚本大全 > > 正文

elasticsearch数据写入原理(Python对ElasticSearch获取数据及操作)

时间:2021-10-16 00:04:31类别:脚本大全

elasticsearch数据写入原理

Python对ElasticSearch获取数据及操作

使用python对elasticsearch获取数据及操作,供大家参考,具体内容如下

version

python :2.7

elasticsearch:6.3

代码:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • #!/usr/bin/env python
  • # -*- coding: utf-8 -*-
  • """
  •   @time  : 2018/7/4
  •   @author : liuxuewen
  •   @site  :
  •   @file  : elasticsearchoperation.py
  •   @software: pycharm
  •   @description: 对elasticsearch数据的操作,包括获取数据,发送数据
  • """
  • import elasticsearch
  • import json
  •  
  • import util_ini_operation
  •  
  • class elasticsearch_data():
  •   def __init__(self,hosts,username,password,maxsize,is_ssl):
  •     # 初始化ini操作脚本,获取配置文件
  •     try:
  •       # 判断请求方式是否ssl加密
  •       if is_ssl == "true":
  •         # 获取证书地址
  •         cert_pem = util_ini_operation.get_ini("config.ini").get_key_value("certs","certs")
  •         es_ssl = elasticsearch.elasticsearch(
  •           # 地址
  •           hosts=hosts,
  •           # 用户名密码
  •           http_auth=(username,password),
  •           # 开启ssl
  •           use_ssl=true,
  •           # 确认有加密证书
  •           verify_certs=true,
  •           # 对应的加密证书地址
  •           client_cert=cert_pem
  •         )
  •         self.es = es_ssl
  •       elif is_ssl == "false":
  •         # 创建普通类型的es客户端
  •         es_ordinary = elasticsearch.elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))
  •         self.es = es_ordinary
  •     except exception as e:
  •       print(e)
  •  
  •  
  •   def query_data(self,keywords_list,date):
  •     gte = "now-"+str(date)
  •     query_data = {
  •       # 查询语句
  •       "query": {
  •         "bool": {
  •           "must": [
  •             {
  •               "query_string": {
  •                 "query": keywords_list,
  •                 "analyze_wildcard": true
  •               }
  •             },
  •             {
  •               "range": {
  •                 "@timestamp": {
  •                   "gte": gte,
  •                   "lte": "now",
  •                   "format": "epoch_millis"
  •                 }
  •               }
  •             }
  •           ],
  •           "must_not": []
  •         }
  •       }
  •     }
  •     return query_data
  •  
  •   # 从es获取数据
  •   def get_datas_by_query(self,index_name,keywords,param,date):
  •     '''
  •     :param index_name: 索引名称
  •     :param keywords: 关键字词,数组
  •     :param param: 需要数据条件,例如_source
  •     :param date: 过去时间范围,字符串格式,例如过去30分钟内数据,"30m"
  •     :return: all_datas 返回查询到的所有数据(已经过param过滤)
  •     '''
  •  
  •     all_datas = []
  •     # 遍历所有的查询条件
  •     for keywords_list in keywords:
  •       # dsl语句
  •       query_data = self.query_data(keywords_list,date)
  •       res = self.es.search(
  •         index=index_name,
  •         body=query_data
  •       )
  •       for hit in res['hits']['hits']:
  •         # 获取指定的内容
  •         response = hit[param]
  •         # 添加所有数据到数据集中
  •         all_datas.append(response)
  •     # 返回所有数据内容
  •     return all_datas
  •  
  •   # 当索引不存在创建索引
  •   def create_index(self,index_name):
  •     '''
  •     :param index_name: 索引名称
  •     :return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称
  •     '''
  •     # 获取索引的映射
  •     # index_mapping = indexmapping.index_mapping
  •     # # 判断索引是否存在
  •     # if self.es.indices.exists(index=index_name) is not true:
  •     #   # 创建索引
  •     #   res = self.es.indices.create(index=index_name,body=index_mapping)
  •     #   # 返回结果
  •     #   return res
  •     # else:
  •     #   # 返回索引名称
  •     #   return index_name
  •     pass
  •  
  •   # 插入指定的单条数据内容
  •   def insert_single_data(self,index_name,doc_type,data):
  •     '''
  •     :param index_name: 索引名称
  •     :param doc_type: 文档类型
  •     :param data: 需要插入的数据内容
  •     :return: 执行结果
  •     '''
  •     res = self.es.index(index=index_name,doc_type=doc_type,body=data)
  •     return res
  •  
  •   # 向es中新增数据,批量插入
  •   def insert_datas(self,index_name):
  •     '''
  •     :desc 通过读取指定的文件内容获取需要插入的数据集
  •     :param index_name: 索引名称
  •     :return: 插入成功的数据条数
  •     '''
  •     insert_datas = []
  •     # 判断插入数据的索引是否存在
  •     self.createindex(index_name=index_name)
  •     # 获取插入数据的文件地址
  •     data_file_path = self.ini.get_key_value("datafile","datafilepath")
  •     # 获取需要插入的数据集
  •     with open(data_file_path,"r+") as data_file:
  •       # 获取文件所有数据
  •       data_lines = data_file.readlines()
  •       for data_line in data_lines:
  •         # string to json
  •         data_line = json.loads(data_line)
  •         insert_datas.append(data_line)
  •     # 批量处理
  •     res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=true)
  •     return res
  •  
  •   # 从es中在指定的索引中删除指定数据(根据id判断)
  •   def delete_data_by_id(self,index_name,doc_type,id):
  •     '''
  •     :param index_name: 索引名称
  •     :param index_type: 文档类型
  •     :param id: 唯一标识id
  •     :return: 删除结果信息
  •     '''
  •     res = self.es.delete(index=index_name,doc_type=doc_type,id=id)
  •     return res
  •  
  •   # 根据条件删除数据
  •   def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):
  •     '''
  •     :param index_name:索引名称,为空查询所有索引
  •     :param doc_type:文档类型,为空查询所有文档类型
  •     :param param:过滤条件值
  •     :param gt_time:时间范围,大于该时间
  •     :param lt_time:时间范围,小于该时间
  •     :return:执行条件删除后的结果信息
  •     '''
  •     # dsl语句
  •     query_data = {
  •       # 查询语句
  •       "query": {
  •         "bool": {
  •           "must": [
  •             {
  •               "query_string": {
  •                 "query": param,
  •                 "analyze_wildcard": true
  •               }
  •             },
  •             {
  •               "range": {
  •                 "@timestamp": {
  •                   "gte": gt_time,
  •                   "lte": lt_time,
  •                   "format": "epoch_millis"
  •                 }
  •               }
  •             }
  •           ],
  •           "must_not": []
  •         }
  •       }
  •     }
  •     res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=true)
  •     return res
  •  
  •   # 指定index中删除指定时间段内的全部数据
  •   def delete_all_datas(self,index_name,doc_type,gt_time,lt_time):
  •     '''
  •     :param index_name:索引名称,为空查询所有索引
  •     :param doc_type:文档类型,为空查询所有文档类型
  •     :param gt_time:时间范围,大于该时间
  •     :param lt_time:时间范围,小于该时间
  • 标签:

    猜您喜欢