elasticsearch数据写入原理
Python对ElasticSearch获取数据及操作使用python对elasticsearch获取数据及操作,供大家参考,具体内容如下
version
python :2.7
elasticsearch:6.3
代码:
|
#!/usr/bin/env python # -*- coding: utf-8 -*- """ @time : 2018/7/4 @author : liuxuewen @site : @file : elasticsearchoperation.py @software: pycharm @description: 对elasticsearch数据的操作,包括获取数据,发送数据 """ import elasticsearch import json import util_ini_operation class elasticsearch_data(): def __init__( self ,hosts,username,password,maxsize,is_ssl): # 初始化ini操作脚本,获取配置文件 try : # 判断请求方式是否ssl加密 if is_ssl = = "true" : # 获取证书地址 cert_pem = util_ini_operation.get_ini( "config.ini" ).get_key_value( "certs" , "certs" ) es_ssl = elasticsearch.elasticsearch( # 地址 hosts = hosts, # 用户名密码 http_auth = (username,password), # 开启ssl use_ssl = true, # 确认有加密证书 verify_certs = true, # 对应的加密证书地址 client_cert = cert_pem ) self .es = es_ssl elif is_ssl = = "false" : # 创建普通类型的es客户端 es_ordinary = elasticsearch.elasticsearch(hosts, http_auth = (username, password), maxsize = int (maxsize)) self .es = es_ordinary except exception as e: print (e) def query_data( self ,keywords_list,date): gte = "now-" + str (date) query_data = { # 查询语句 "query" : { "bool" : { "must" : [ { "query_string" : { "query" : keywords_list, "analyze_wildcard" : true } }, { "range" : { "@timestamp" : { "gte" : gte, "lte" : "now" , "format" : "epoch_millis" } } } ], "must_not" : [] } } } return query_data # 从es获取数据 def get_datas_by_query( self ,index_name,keywords,param,date): ''' :param index_name: 索引名称 :param keywords: 关键字词,数组 :param param: 需要数据条件,例如_source :param date: 过去时间范围,字符串格式,例如过去30分钟内数据,"30m" :return: all_datas 返回查询到的所有数据(已经过param过滤) ''' all_datas = [] # 遍历所有的查询条件 for keywords_list in keywords: # dsl语句 query_data = self .query_data(keywords_list,date) res = self .es.search( index = index_name, body = query_data ) for hit in res[ 'hits' ][ 'hits' ]: # 获取指定的内容 response = hit[param] # 添加所有数据到数据集中 all_datas.append(response) # 返回所有数据内容 return all_datas # 当索引不存在创建索引 def create_index( self ,index_name): ''' :param index_name: 索引名称 :return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称 ''' # 获取索引的映射 # index_mapping = indexmapping.index_mapping # # 判断索引是否存在 # if self.es.indices.exists(index=index_name) is not true: # # 创建索引 # res = self.es.indices.create(index=index_name,body=index_mapping) # # 返回结果 # return res # else: # # 返回索引名称 # return index_name pass # 插入指定的单条数据内容 def insert_single_data( self ,index_name,doc_type,data): ''' :param index_name: 索引名称 :param doc_type: 文档类型 :param data: 需要插入的数据内容 :return: 执行结果 ''' res = self .es.index(index = index_name,doc_type = doc_type,body = data) return res # 向es中新增数据,批量插入 def insert_datas( self ,index_name): ''' :desc 通过读取指定的文件内容获取需要插入的数据集 :param index_name: 索引名称 :return: 插入成功的数据条数 ''' insert_datas = [] # 判断插入数据的索引是否存在 self .createindex(index_name = index_name) # 获取插入数据的文件地址 data_file_path = self .ini.get_key_value( "datafile" , "datafilepath" ) # 获取需要插入的数据集 with open (data_file_path, "r+" ) as data_file: # 获取文件所有数据 data_lines = data_file.readlines() for data_line in data_lines: # string to json data_line = json.loads(data_line) insert_datas.append(data_line) # 批量处理 res = self .es.bulk(index = index_name,body = insert_datas,raise_on_error = true) return res # 从es中在指定的索引中删除指定数据(根据id判断) def delete_data_by_id( self ,index_name,doc_type, id ): ''' :param index_name: 索引名称 :param index_type: 文档类型 :param id: 唯一标识id :return: 删除结果信息 ''' res = self .es.delete(index = index_name,doc_type = doc_type, id = id ) return res # 根据条件删除数据 def delete_data_by_query( self ,index_name,doc_type,param,gt_time,lt_time): ''' :param index_name:索引名称,为空查询所有索引 :param doc_type:文档类型,为空查询所有文档类型 :param param:过滤条件值 :param gt_time:时间范围,大于该时间 :param lt_time:时间范围,小于该时间 :return:执行条件删除后的结果信息 ''' # dsl语句 query_data = { # 查询语句 "query" : { "bool" : { "must" : [ { "query_string" : { "query" : param, "analyze_wildcard" : true } }, { "range" : { "@timestamp" : { "gte" : gt_time, "lte" : lt_time, "format" : "epoch_millis" } } } ], "must_not" : [] } } } res = self .es.delete_by_query(index = index_name,doc_type = doc_type,body = query_data,_source = true) return res # 指定index中删除指定时间段内的全部数据 def delete_all_datas( self ,index_name,doc_type,gt_time,lt_time): ''' :param index_name:索引名称,为空查询所有索引 :param doc_type:文档类型,为空查询所有文档类型 :param gt_time:时间范围,大于该时间 :param lt_time:时间范围,小于该时间 猜您喜欢
热门推荐
|