nginx+lua+storm的热点缓存的流量分发策略自动降级

网友投稿 255 2022-11-30

nginx+lua+storm的热点缓存的流量分发策略自动降级

1、在storm中,实时的计算出瞬间出现的热点。

某个storm task,上面算出了1万个商品的访问次数,LRUMap

频率高一些,每隔5秒,去遍历一次LRUMap,将其中的访问次数进行排序,统计出往后排的95%的商品访问次数的平均值

比如说,95%的商品,访问次数的平均值是100

从最前面开始,往后遍历,去找有没有瞬间出现的热点数据

1000,95%的平均值(100)的10倍,这个时候要设定一个阈值,比如说超出95%平均值得n倍,5倍

我们就认为是瞬间出现的热点数据,判断其可能在短时间内继续扩大的访问量,甚至达到平均值几十倍,或者几百倍

当遍历,发现说第一个商品的访问次数,小于平均值的5倍,就安全了,就break掉这个循环

热点数据,热数据,不是一个概念

有100个商品,前10个商品比较热,都访问量在500左右,其他的普通商品,访问量都在200左右,就说前10个商品是热数据

统计出来

预热的时候,将这些热数据放在缓存中去预热就可以了

热点,前面某个商品的访问量,瞬间超出了普通商品的10倍,或者100倍,1000倍,热点

2、storm这里,会直接发送java.io.BufferedReader;import java.io.InputStream;import java.io.InputStreamReader;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Map.Entry;import org.apache.org.apache.org.apache.org.apache.org.apache.org.apache.org.apache.org.apache.org.apache.org.apache.class HttpClientUtils { /** * 发送GET请求 * @param url 请求URL * @return 响应结果 */ @SuppressWarnings("resource") public static String sendGetRequest(String url) { String = null; HttpClient = null; InputStream is = null; BufferedReader br = null; try { // 发送GET请求 = new DefaultHttpClient(); HttpGet = new HttpGet(url); HttpResponse response = // 处理响应 HttpEntity entity = response.getEntity(); if (entity != null) { is = entity.getContent(); br = new BufferedReader(new InputStreamReader(is)); StringBuffer buffer = new StringBuffer(""); String line = null; while ((line = br.readLine()) != null) { buffer.append(line + "\n"); } = buffer.toString(); } } catch (Exception e) { e.printStackTrace(); } finally { try { if(br != null) { br.close(); } if(is != null) { is.close(); } } catch (Exception e2) { e2.printStackTrace(); } } return } /** * 发送post请求 * @param url URL * @param map 参数Map * @return */ @SuppressWarnings({ "rawtypes", "unchecked", "resource" }) public static String sendPostRequest(String url, Map map){ HttpClient = null; HttpPost = null; String result = null; try{ = new DefaultHttpClient(); = new HttpPost(url); //设置参数 List list = new ArrayList(); Iterator iterator = map.entrySet().iterator(); while(iterator.hasNext()){ Entry elem = (Entry) iterator.next(); list.add(new BasicNameValuePair(elem.getKey(), elem.getValue())); } if(list.size() > 0){ UrlEncodedFormEntity entity = new UrlEncodedFormEntity(list, "utf-8"); } HttpResponse response = if(response != null){ HttpEntity resEntity = response.getEntity(); if(resEntity != null){ result = EntityUtils.toString(resEntity, "utf-8"); } } } catch(Exception ex){ ex.printStackTrace(); } finally { } return result; } }

private class HotProductFindThread implements Runnable { @SuppressWarnings("deprecation") public void run() { List> productCountList = new ArrayList>(); List hotProductIdList = new ArrayList(); List lastTimeHotProductIdList = new ArrayList(); while(true) { // 1、将LRUMap中的数据按照访问次数,进行全局的排序 // 2、计算95%的商品的访问次数的平均值 // 3、遍历排序后的商品访问次数,从最大的开始 // 4、如果某个商品比如它的访问量是平均值的10倍,就认为是缓存的热点 try { productCountList.clear(); hotProductIdList.clear(); if(productCountMap.size() == 0) { Utils.sleep(100); continue; } LOGGER.info("【HotProductFindThread打印productCountMap的长度】size=" + productCountMap.size()); // 1、先做全局的排序 for(Map.Entry productCountEntry : productCountMap.entrySet()) { if(productCountList.size() == 0) { productCountList.add(productCountEntry); } else { // 比较大小,生成最热topn的算法有很多种 // 但是我这里为了简化起见,不想引入过多的数据结构和算法的的东西 // 很有可能还是会有漏洞,但是我已经反复推演了一下了,而且也画图分析过这个算法的运行流程了 boolean bigger = false; for(int i = 0; i < productCountList.size(); i++){ Map.Entry topnProductCountEntry = productCountList.get(i); if(productCountEntry.getValue() > topnProductCountEntry.getValue()) { int lastIndex = productCountList.size() < productCountMap.size() ? productCountList.size() - 1 : productCountMap.size() - 2; for(int j = lastIndex; j >= i; j--) { if(j + 1 == productCountList.size()) { productCountList.add(null); } productCountList.set(j + 1, productCountList.get(j)); } productCountList.set(i, productCountEntry); bigger = true; break; } } if(!bigger) { if(productCountList.size() < productCountMap.size()) { productCountList.add(productCountEntry); } } } } LOGGER.info("【HotProductFindThread全局排序后的结果】productCountList=" + productCountList); // 2、计算出95%的商品的访问次数的平均值 int calculateCount = (int)Math.floor(productCountList.size() * 0.95); Long totalCount = 0L; for(int i = productCountList.size() - 1; i >= productCountList.size() - calculateCount; i--) { totalCount += productCountList.get(i).getValue(); } Long avgCount = totalCount / calculateCount; LOGGER.info("【HotProductFindThread计算出95%的商品的访问次数平均值】avgCount=" + avgCount); // 3、从第一个元素开始遍历,判断是否是平均值得10倍 for(Map.Entry productCountEntry : productCountList) { if(productCountEntry.getValue() > 10 * avgCount) { LOGGER.info("【HotProductFindThread发现一个热点】productCountEntry=" + productCountEntry); hotProductIdList.add(productCountEntry.getKey()); if(!lastTimeHotProductIdList.contains(productCountEntry.getKey())) { // 将缓存热点反向推送到流量分发的nginx中 String distributeNginxURL = "+ productCountEntry.getKey(); HttpClientUtils.sendGetRequest(distributeNginxURL); // 将缓存热点,那个商品对应的完整的缓存数据,发送请求到缓存服务去获取,反向推送到所有的后端应用nginx服务器上去 String cacheServiceURL = "+ productCountEntry.getKey(); String response = HttpClientUtils.sendGetRequest(cacheServiceURL); List params = new ArrayList(); params.add(new BasicNameValuePair("productInfo", response)); String productInfo = URLEncodedUtils.format(params, HTTP.UTF_8); String[] appNginxURLs = new String[]{ "+ productCountEntry.getKey() + "&" + productInfo, "+ productCountEntry.getKey() + "&" + productInfo }; for(String appNginxURL : appNginxURLs) { HttpClientUtils.sendGetRequest(appNginxURL); } } } } // 4、实时感知热点数据的消失 if(lastTimeHotProductIdList.size() == 0) { if(hotProductIdList.size() > 0) { for(Long productId : hotProductIdList) { lastTimeHotProductIdList.add(productId); } LOGGER.info("【HotProductFindThread保存上次热点数据】lastTimeHotProductIdList=" + lastTimeHotProductIdList); } } else { for(Long productId : lastTimeHotProductIdList) { if(!hotProductIdList.contains(productId)) { LOGGER.info("【HotProductFindThread发现一个热点消失了】productId=" + productId); // 说明上次的那个商品id的热点,消失了 // 发送一个 String url = "+ productId; HttpClientUtils.sendGetRequest(url); } } if(hotProductIdList.size() > 0) { lastTimeHotProductIdList.clear(); for(Long productId : hotProductIdList) { lastTimeHotProductIdList.add(productId); } LOGGER.info("【HotProductFindThread保存上次热点数据】lastTimeHotProductIdList=" + lastTimeHotProductIdList); } else { lastTimeHotProductIdList.clear(); } } Utils.sleep(5000); } catch (Exception e) { e.printStackTrace(); } } } }

流量分发local uri_args = ngx.req.get_uri_args()local product_id = uri_args["productId"]local cache_ngx = ngx.shared.my_cachelocal hot_product_cache_key = "hot_product_"..product_idcache_ngx:set(hot_product_cache_key, "true", 60 * 60)后端应用local uri_args = ngx.req.get_uri_args()local product_id = uri_args["productId"]local product_info = uri_args["productInfo"]local product_cache_key = "product_info_"..product_idlocal cache_ngx = ngx.shared.my_cachecache_ngx:set(product_cache_key,product_info,60 * 60)

math.randomseed(tostring(os.time()):reverse():sub(1, 7))math.random(1, 2)local uri_args = ngx.req.get_uri_args()local productId = uri_args["productId"]local shopId = uri_args["shopId"]local hosts = {"192.168.31.187", "192.168.31.19"}local backend = ""local hot_product_key = "hot_product_"..productIdlocal cache_ngx = ngx.shared.my_cachelocal hot_product_flag = cache_ngx:get(hot_product_key)if hot_product_flag == "true" then math.randomseed(tostring(os.time()):reverse():sub(1, 7)) local index = math.random(1, 2) backend = " local hash = ngx.crc32_long(productId) local index = (hash % 2) + 1 backend = "requestPath = uri_args["requestPath"]requestPath = "/"..requestPath.."?productId="..productId.."&shopId="..shopIdlocal = require("resty.= resp, err = method = "GET", path = requestPath})if not resp then ngx.say("request error: ", err) returnendngx.say(resp.body)httpc:close()

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Kafka在美团数据平台的实践
下一篇:Java实现MD5加密的方式与实例代码
相关文章

 发表评论

暂时没有评论,来抢沙发吧~