C# Elasticsearch帮助类
ElasticsearchConfig
/// /// ES 连接配置 /// public class ElasticsearchConfig { /// /// 节点列表 /// public IEnumerable Nodes { get; set; } /// /// 连接池类型 /// public ElasticsearchConnectionPoolType PoolType { get; set; } = ElasticsearchConnectionPoolType.Static; /// /// 用户名 /// public string UserName { get; set; } /// /// 密码 /// public string Password { get; set; } /// /// 显示调试信息 /// public bool DisableDebugInfo { get; set; } = true; /// /// 抛出异常。默认false,错误信息在每个操作的response中 /// public bool ThrowExceptions { get; set; } = false; /// /// 是否禁用Ping。禁用ping 第一次使用节点或使用被标记死亡的节点进行ping /// public bool DisablePing { get; set; } = true; }
ElasticsearchConfigProvider
/// /// Elasticsearch 配置提供程序 /// public class ElasticsearchConfigProvider : IElasticsearchConfigProvider { /// /// 配置 /// private readonly ElasticsearchConfig _config; /// /// 初始化一个类型的实例 /// /// Elasticsearch 连接配置 public ElasticsearchConfigProvider(ElasticsearchConfig config) { _config = config; } /// /// 获取配置 /// /// public Task GetConfigAsync() { return Task.FromResult(_config); } }
ElasticsearchConnectionPoolType
/// /// ES 连接池类型。 /// 支持ping-说明能够发现节点的状态; /// 支持嗅探-说明能够发现新的节点 /// public enum ElasticsearchConnectionPoolType { /// /// 静态连接池。推荐使用,应用于已知集群,请求时随机请求各个正常节点,支持ping,不支持嗅探 /// Static, /// /// 单节点连接池 /// SingleNode, /// /// 嗅探连接池。可动态嗅探集群,随机请求,支持嗅探、ping /// Sniffing, /// /// 固定连接池。选择一个可用节点作为请求主节点,支持ping,不支持嗅探 /// Sticky, /// /// 固定嗅探连接池。选择一个可用节点作为请求主节点,支持ping,支持嗅探 /// StickySniffing }
ElasticsearchNode
/// /// Elasticsearch 节点 /// public class ElasticsearchNode { /// /// 主机 /// public string Host { get; set; } /// /// 端口号 /// public uint Port { get; set; } /// /// 输出字符串 /// /// public override string ToString() { var port = Port == 0 ? "" : $":{Port}"; var result = $"{Host}{port}".ToLowerInvariant(); return result.IndexOf("StringComparison.OrdinalIgnoreCase) > -1 ? result : $" } }
IElasticsearchConfigProvider
public interface IElasticsearchConfigProvider { /// /// 获取配置 /// /// Task GetConfigAsync(); }
ElasticClientExtensions
/// /// ES客户端() 扩展 /// internal static class ElasticClientExtensions { /// /// 初始化索引映射 /// /// ES客户端 /// 索引名 public static async Task InitializeIndexMapAsync(this IElasticClient client, string indexName) { var newName = indexName + DateTime.Now.Ticks; var result = await client.CreateIndexAsync(newName, t => t.Index(newName).Settings(x => x.NumberOfShards(1).NumberOfReplicas(1).Setting("max_result_window", int.MaxValue))); if (result.Acknowledged) { await client.AliasAsync(x => x.Add(o => o.Index(newName).Alias(indexName))); return; } throw new ElasticsearchException($"创建索引 {indexName} 失败:{result.ServerError.Error.Reason}"); } /// /// 初始化索引映射 /// /// 实体类型 /// ES客户端 /// 索引名 public static async Task InitializeIndexMapAsync(this IElasticClient client, string indexName) where T : class { var newName = indexName + DateTime.Now.Ticks; var result = await client.CreateIndexAsync(newName, t => t.Index(newName) .Settings(o => o.NumberOfShards(1).NumberOfReplicas(1) .Setting("max_result_window", int.MaxValue)) .Mappings(m => m.Map(mm => mm.AutoMap()))); if (result.Acknowledged) { await client.AliasAsync(x => x.Add(o => o.Index(newName).Alias(indexName))); return; } throw new ElasticsearchException($"创建索引 {indexName} 失败:{result.ServerError.Error.Reason}"); } /// /// 初始化索引映射 /// /// 实体类型 /// ES客户端 /// 索引名 public static async Task InitializeIndexMapAsync(this IElasticClient client, string indexName, int numberOfShards, int numberOfReplicas) where T : class { var newName = indexName + DateTime.Now.Ticks; var result = await client.CreateIndexAsync(newName, x => x.Index(newName) .Settings(o => o.NumberOfShards(numberOfShards) .NumberOfReplicas(numberOfReplicas) .Setting("max_result_window", int.MaxValue)) .Mappings(m => m.Map(mm => mm.AutoMap()))); if (result.Acknowledged) { await client.AliasAsync(x => x.Add(o => o.Index(newName).Alias(indexName))); return; } throw new ElasticsearchException($"创建索引 {indexName} 失败:{result.ServerError.Error.Reason}"); } }
HighlightParam
public class HighlightParam { /// /// 高亮字段 /// public string[] Keys { get; set; } /// /// 高亮标签 /// public string PreTags { get; set; } = ""; /// /// 高亮标签 /// public string PostTags { get; set; } = ""; /// /// 高亮字段前缀。 /// 例如:title 高亮值赋值给 h_title /// public string PrefixOfKey { get; set; } = string.Empty; /// /// 是否替换原来的值 /// public bool ReplaceAuto { get; set; } = true; }
IPageParam
public interface IPageParam { /// /// 页数,即第几页,从1开始 /// int Page { get; set; } /// /// 每页显示行数 /// int PageSize { get; set; } /// /// 关键词 /// string Keyword { get; set; } /// /// 获取跳过的行数 /// /// int GetSkipCount(); /// /// 运算符 /// Nest.Operator Operator { get; set; } /// /// 高亮参数 /// HighlightParam Highlight { get; set; } } /// /// 分页参数 /// public class PageParam : IPageParam { /// /// 页数,即第几页,从1开始 /// public int Page { get; set; } /// /// 每页显示行数 /// public int PageSize { get; set; } /// /// 关键词 /// public string Keyword { get; set; } /// /// 获取跳过的行数 /// /// public int GetSkipCount() => (Page - 1) * PageSize; /// /// 运算符 /// public Nest.Operator Operator { get; set; } = Nest.Operator.And; /// /// 高亮参数 /// public HighlightParam Highlight { get; set; } } /// /// 指定字段查询 /// public class PageParamWithSearch : PageParam { /// /// 查询字段列表 /// public string[] SearchKeys { get; set; } }
IQueryResult
/// /// 查询结果 /// /// 实体类型 public interface IQueryResult { /// /// 总行数 /// long TotalCount { get; set; } /// /// 查询占用时间 /// long Took { get; set; } /// /// 数据 /// IEnumerable Data { get; } } /// /// 自定义查询结果 /// /// 实体类型 public class CustomQueryResult : IQueryResult { /// /// 总行数 /// public long TotalCount { get; set; } /// /// 查询占用时间 /// public long Took { get; set; } /// /// 数据 /// public IEnumerable Data { get; set; } }
ElasticsearchClient
/// /// ES客户端 /// public class ElasticsearchClient : IElasticsearchClient { /// /// ES客户端生成器 /// private ElasticsearchClientBuilder _builder; /// /// 配置提供程序 /// private IElasticsearchConfigProvider _configProvider; /// /// 初始化一个类型的实例 /// /// 配置提供程序 public ElasticsearchClient(IElasticsearchConfigProvider configProvider) { _configProvider = configProvider ?? throw new ArgumentNullException(nameof(configProvider)); _builder = new ElasticsearchClientBuilder(configProvider); } /// /// 是否存在指定索引 /// /// 索引名 /// public async Task ExistsAsync(string indexName) { var client = await _builder.GetClientAsync(); var result = await client.IndexExistsAsync(indexName); return result.Exists; } /// /// 添加索引。不映射 /// /// 索引名 public async Task AddAsync(string indexName) { var client = await _builder.GetClientAsync(); if (await ExistsAsync(indexName)) return; await client.InitializeIndexMapAsync(indexName); } /// /// 添加索引。自动映射实体属性 /// /// 实体类型 /// 索引名 public async Task AddAsync(string indexName) where T : class { var client = await _builder.GetClientAsync(); if (await ExistsAsync(indexName)) return; await client.InitializeIndexMapAsync(indexName); } /// /// 添加索引。自动映射实体属性并赋值 /// /// 实体类型 /// 索引名 /// 实体 public async Task AddAsync(string indexName, T entity) where T : class { var client = await _builder.GetClientAsync(); if (!await ExistsAsync(indexName)) await client.InitializeIndexMapAsync(indexName); var response = await client.IndexAsync(entity, x => x.Index(indexName)); if(!response.IsValid) throw new ElasticsearchException($"新增数据[{indexName}]失败 : {response.ServerError.Error.Reason}"); } /// /// 更新索引。 /// 由于是普通的简单更新,当ID已经存在时,则会更新文档,所以这里直接调用index方法(复杂方法待研究) /// /// 实体类型 /// 索引名 /// 实体 public async Task UpdateAsync(string indexName, T entity) where T : class =>await AddAsync(indexName, entity); /// /// 删除索引 /// /// 索引名 public async Task DeleteAsync(string indexName) { var client = await _builder.GetClientAsync(); var response = await client.DeleteIndexAsync(indexName); if (response.Acknowledged) return; } /// /// 删除索引 /// /// 实体类型 /// 索引名 /// 实体 public async Task DeleteAsync(string indexName, T entity) where T : class { var client = await _builder.GetClientAsync(); var response = await client.DeleteAsync(new DeleteRequest(indexName, typeof(T), new Id(entity))); if (response.ServerError == null) return; throw new ElasticsearchException($"删除索引[{indexName}]失败 : {response.ServerError.Error.Reason}"); } /// /// 删除索引 /// /// 实体类型 /// 索引名 /// 主键ID public async Task DeleteAsync(string indexName, long id) where T : class { var client = await _builder.GetClientAsync(); var response = await client.DeleteAsync(DocumentPath.Id(new Id(id)), x => x.Type().Index(indexName)); if (response.ServerError == null) return; throw new ElasticsearchException($"删除索引[{indexName}]失败 : {response.ServerError.Error.Reason}"); } /// /// 查询实体 /// /// 实体类型 /// 索引名 /// 主键ID /// public async Task FindAsync(string indexName, long id) where T : class { var client = await _builder.GetClientAsync(); var response = await client.GetAsync(id, x => x.Type().Index(indexName)); return response?.Source; } /// /// 查询。单一条件查询,一般是精确查询 /// /// 实体类型 /// 索引名 /// 字段名 /// 查询值 /// public async Task> QueryAsync(string indexName, string field, object value) where T : class { if (string.IsNullOrWhiteSpace(field)) return null; var client = await _builder.GetClientAsync(); var searchRequest = new SearchDescriptor() .Index(indexName) .PostFilter(t => t.Term(x => x.Field(field).Value(value))); var response = await client.SearchAsync(searchRequest); return response.Documents; } /// /// 查找实体列表 /// /// 实体类型 /// 索引名 /// 主键值 /// public async Task> FindByIdsAsync(string indexName, params long[] ids) where T : class { var client = await _builder.GetClientAsync(); var searchRequest = new SearchDescriptor().Index(indexName).Query(t => t.Ids(x => x.Values(ids))); var response = await client.SearchAsync(searchRequest); return response.Documents; } /// /// 查找实体列表 /// /// 实体类型 /// 索引名 /// 主键值 /// public async Task> FindByIdsAsync(string indexName, params string[] ids) where T : class { var client = await _builder.GetClientAsync(); var searchRequest = new SearchDescriptor().Index(indexName).Query(t => t.Ids(x => x.Values(ids))); var response = await client.SearchAsync(searchRequest); return response.Documents; } /// /// 查找实体列表 /// /// 实体类型 /// 索引名 /// 主键值 /// public async Task> FindByIdsAsync(string indexName, params Guid[] ids) where T : class { var client = await _builder.GetClientAsync(); var searchRequest = new SearchDescriptor().Index(indexName).Query(q => q.Ids(x => x.Values(ids))); var response = await client.SearchAsync(searchRequest); return response.Documents; } /// /// 分页查询 /// /// 实体类型 /// 分页参数 /// 索引名 /// public async Task> PageQueryAsync(IPageParam param, string indexName) where T : class { if (param == null) { param = new PageParam() { Page = 1, PageSize = 20 }; } var searchRequest = new SearchDescriptor() .Type() .Index(indexName) .From(param.GetSkipCount()) .Size(param.PageSize); if (param is PageParamWithSearch pageSearch) ConfigPageRequest(pageSearch, ref searchRequest); else if(param is PageParam pageParam) ConfigPageRequest(pageParam, ref searchRequest); // 是否需要高亮 bool hasHighlight = param.Highlight?.Keys?.Length > 0; if(hasHighlight) BuildHighLightQuery(param, ref searchRequest); var client = await _builder.GetClientAsync(); var response = await client.SearchAsync(x => searchRequest); //if (hasHighlight) //{ // var listWithHightlight = new List(); // response.Hits.ToList().ForEach(x => // { // if (x.Highlights?.Count > 0) // { // PropertyInfo[] properties = typeof(T).GetProperties(); // foreach (string key in pageParams.Highlight?.Keys) // { // //先得到要替换的内容 // if (x.Highlights.ContainsKey(key)) // { // string value = string.Join("", x.Highlights[key]?.Highlights); // PropertyInfo info = properties.FirstOrDefault(p => p.Name == pageParams.Highlight.PrefixOfKey + key); // //没找到带前缀的属性,则替换之前的 // if (info == null && pageParams.Highlight.ReplaceAuto) // { // info = properties.FirstOrDefault(p => p.Name == key); // } // if (info?.CanWrite == true) // { // if (!string.IsNullOrEmpty(value)) // { // //如果高亮字段不为空,才赋值,否则就赋值成空 // info.SetValue(x.Source, value); // } // } // } // } // } // listWithHightlight.Add(x.Source); // }); //} return new CustomQueryResult() { Data = response.Documents, Took = response.Took, TotalCount = response.Total }; } /// /// 配置指定字段的分页请求 /// private void ConfigPageRequest(PageParamWithSearch param, ref SearchDescriptor searchRequest) where T : class { searchRequest = searchRequest.Query(t=> t.QueryString(x => x.Fields(param.SearchKeys) .Query(param.Keyword) .DefaultOperator(param.Operator))); } /// /// 配置分页请求 /// private void ConfigPageRequest(PageParam param, ref SearchDescriptor searchRequest) where T : class { searchRequest= searchRequest.Query( t=>t.QueryString(q=>q.Query(param.Keyword) .DefaultOperator(param.Operator))); } /// /// 构造高亮查询 /// private void BuildHighLightQuery(IPageParam param, ref SearchDescriptor searchRequest) where T : class { var keysLength = param.Highlight?.Keys?.Length ?? 0; var fieldDescriptor = new Func, IHighlightField>[keysLength]; var keysIndex = 0; foreach (var key in param.Highlight?.Keys) { fieldDescriptor[keysIndex] = hf => hf.Field(key) .HighlightQuery(q => q.Match(m => m.Field(key).Query(param.Keyword))); keysIndex++; } IHighlight highlight = new HighlightDescriptor() .PreTags(param.Highlight.PreTags) .PostTags(param.Highlight.PostTags) .Fields(fieldDescriptor); searchRequest = searchRequest.Highlight(s => highlight); } /// /// 批量保存 /// /// 实体类型 /// 索引名 /// 实体列表 public async Task BulkSaveAsync(string indexName, IEnumerable entities) where T : class { var client = await _builder.GetClientAsync(); if (!await ExistsAsync(indexName)) { await client.InitializeIndexMapAsync(indexName); } var bulk = new BulkRequest(indexName) { Operations = new List() }; foreach (var entity in entities) { bulk.Operations.Add(new BulkIndexOperation(entity)); } var response = await client.BulkAsync(bulk); if (response.Errors) { throw new ElasticsearchException($"批量保存文档在索引 {indexName} 失败:{response.ServerError.Error.Reason}"); } } }
ElasticsearchClientBuilder
/// /// ES客户端生成器 /// internal class ElasticsearchClientBuilder { /// /// ES客户端 /// private IElasticClient _client; /// /// 配置提供程序 /// private readonly IElasticsearchConfigProvider _configProvider; /// /// 对象锁 /// private static object _lock = new object(); /// /// 初始化一个类型的实例 /// /// 配置提供程序 public ElasticsearchClientBuilder(IElasticsearchConfigProvider configProvider) { _configProvider = configProvider; } /// /// 获取ES客户端 /// /// public async Task GetClientAsync() { if (_client == null) { var config = await _configProvider.GetConfigAsync(); lock (_lock) { if (_client == null) { if (config.Nodes == null) throw new ArgumentException("请设置ES客户端节点"); _client = CreateClient(config); } } } return _client; } /// /// 创建ES客户端 /// /// 配置 /// private IElasticClient CreateClient(ElasticsearchConfig config) { var connectionPool = CreateConnectionPool(config); var settings = new ConnectionSettings(connectionPool); ConfigSettings(settings, config); return new ElasticClient(settings); } /// /// 创建连接池 /// /// /// private IConnectionPool CreateConnectionPool(ElasticsearchConfig config) { var nodes = config.Nodes.Select(t => new Uri(t.ToString())).ToList(); switch (config.PoolType) { case ElasticsearchConnectionPoolType.Static: return new StaticConnectionPool(nodes); case ElasticsearchConnectionPoolType.SingleNode: return new SingleNodeConnectionPool(nodes.FirstOrDefault()); case ElasticsearchConnectionPoolType.Sniffing: return new SniffingConnectionPool(nodes); case ElasticsearchConnectionPoolType.Sticky: return new StickyConnectionPool(nodes); case ElasticsearchConnectionPoolType.StickySniffing: return new StickySniffingConnectionPool(nodes, x => 1.0F); default: return new StaticConnectionPool(nodes); } } /// /// 配置连接设置 /// /// 连接设置 /// 配置 private void ConfigSettings(ConnectionSettings settings, ElasticsearchConfig config) { // 启用验证 if (!string.IsNullOrWhiteSpace(config.UserName) && !string.IsNullOrWhiteSpace(config.Password)) settings.BasicAuthentication(config.UserName, config.Password); // 验证证书 //settings.ClientCertificate(""); //settings.ClientCertificates(new System.Security.Cryptography.X509Certificates.X509CertificateCollection()); //settings.ServerCertificateValidationCallback(); // 开启第一次使用时进行嗅探,需连接池支持 //settings.SniffOnStartup(false); // 链接最大并发数 //settings.ConnectionLimit(80); // 标记为死亡节点的超时时间 //settings.DeadTimeout(new TimeSpan(10000)); //settings.MaxDeadTimeout(new TimeSpan(10000)); // 最大重试次数 //settings.MaximumRetries(5); // 重试超时时间,默认是RequestTimeout //settings.MaxRetryTimeout(new TimeSpan(50000)); // 禁用代理自动检测 //settings.DisableAutomaticProxyDetection(true); // 禁用ping,第一次使用节点或使用被标记死亡的节点进行ping settings.DisablePing(config.DisablePing); // ping超时设置 //settings.PingTimeout(new TimeSpan(10000)); // 选择节点 //settings.NodePredicate(node => { return true; }); // 默认操作索引 //settings.DefaultIndex(""); // 字段名规则 与model字段同名 //settings.DefaultFieldNameInferrer(name => name); // 根据Type获取类型名 //settings.DefaultTypeNameInferrer(name => name.Name); // 请求超时设置 //settings.RequestTimeout(new TimeSpan(10000)); // 调试信息 settings.DisableDirectStreaming(config.DisableDebugInfo); //settings.EnableDebugMode((apiCallDetails) => //{ // // 请求完成 返回 apiCallDetails //}); // 抛出异常,默认false,错误信息在每个操作的response中 settings.ThrowExceptions(config.ThrowExceptions); //settings.OnRequestCompleted(apiCallDetails => //{ // // 请求完成 返回apiCallDetails //}); //settings.OnRequestDataCreated(requestData => //{ // // 请求的数据创建完成 返回请求的数据 //}); } }
ElasticsearchException
/// /// Elasticsearch 异常 /// [Serializable] public class ElasticsearchException : Exception { /// /// 初始化一个类型的实例 /// public ElasticsearchException() { } /// /// 初始化一个类型的实例 /// /// 序列号信息 /// 流上下文 public ElasticsearchException(SerializationInfo serializationInfo, StreamingContext context) : base(serializationInfo, context) { } /// /// 初始化一个类型的实例 /// /// 错误消息 public ElasticsearchException(string message) : base(message) { } /// /// 初始化一个类型的实例 /// /// 错误消息 /// 内部异常 public ElasticsearchException(string message, Exception innerException) : base(message, innerException) { } }
IElasticsearchClient
/// /// ES客户端 /// public interface IElasticsearchClient { /// /// 是否存在指定索引 /// /// 索引名 /// Task ExistsAsync(string indexName); /// /// 添加索引。不映射 /// /// 索引名 Task AddAsync(string indexName); /// /// 添加索引。自动映射实体属性 /// /// 实体类型 /// 索引名 Task AddAsync(string indexName) where T : class; /// /// 添加索引。自动映射实体属性并赋值 /// /// 实体类型 /// 索引名 /// 实体 /// Task AddAsync(string indexName, T entity) where T : class; /// /// 更新索引。 /// 由于是普通的简单更新,当ID已经存在时,则会更新文档,所以这里直接调用index方法(复杂方法待研究) /// /// 实体类型 /// 索引名 /// 实体 Task UpdateAsync(string indexName, T entity) where T : class; }
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~