linux cpu占用率如何看
277
2022-09-04
nginx+lua访问流量实时上报kafka
在nginx这一层,接收到访问请求的时候,就把请求的流量上报发送给kafka
storm才能去消费kafka中的实时的访问日志,然后去进行缓存热数据的统计
从lua脚本直接创建一个kafka producer,发送数据到kafka
wget install -y unzipunzip lua-resty-kafka-master.zipcp -rf /usr/local/lua-resty-kafka-master/lib/resty /usr/hello/lualibnginx -s reload
lua脚本:
local cjson = require("cjson") local producer = require("resty.kafka.producer") local broker_list = { { host = "192.168.31.187", port = 9092 }, { host = "192.168.31.19", port = 9092 }, { host = "192.168.31.227", port = 9092 }}local log_json = {} log_json["headers"] = ngx.req.get_headers() log_json["uri_args"] = ngx.req.get_uri_args() log_json["body"] = ngx.req.read_body() log_json["= ngx.req. log_json["method"] =ngx.req.get_method() log_json["raw_reader"] = ngx.req.raw_header() log_json["body_data"] = ngx.req.get_body_data() local message = cjson.encode(log_json); local productId = ngx.req.get_uri_args()["productId"]local async_producer = producer:new(broker_list, { producer_type = "async" }) local ok, err = async_producer:send("access-log", productId, message) if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end
两台机器上都这样做,才能统一上报流量到kafka
bin/kafka-topics.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --replication-factor 1 --partitions 1 --create
bin/kafka-console-consumer.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --from-beginning
(1)kafka在187上的节点死掉了,可能是虚拟机的问题,杀掉进程,重新启动一下
nohup bin/kafka-server-start.sh config/server.properties &
(2)需要在nginx.conf中,8.8.8.8;
(3)需要在kafka中加入advertised.host.name = 192.168.31.187,重启三个kafka进程
(4)需要启动eshop-cache缓存服务,因为nginx中的本地缓存可能不在了
基于storm+kafka完成商品访问次数实时统计拓扑的开发:总结思路:
1、kafka consumer spout
单独的线程消费,写入队列
nextTuple,每次都是判断队列有没有数据,有的话再去获取并发射出去,不能阻塞
2、日志解析bolt
3、商品访问次数统计bolt
4、基于LRUMap完成统计
import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.tuple.Fields;import org.apache.storm.utils.Utils;import com.roncoo.eshop.storm.bolt.LogParseBolt;import com.roncoo.eshop.storm.bolt.ProductCountBolt;import com.roncoo.eshop.storm.spout.AccessLogKafkaSpout;/** * 热数据统计拓扑 * @author Administrator * */public class HotProductTopology { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("AccessLogKafkaSpout", new AccessLogKafkaSpout(), 1); builder.setBolt("LogParseBolt", new LogParseBolt(), 5) .setNumTasks(5) .shuffleGrouping("AccessLogKafkaSpout"); builder.setBolt("ProductCountBolt", new ProductCountBolt(), 5) .setNumTasks(10) .fieldsGrouping("LogParseBolt", new Fields("productId")); Config config = new Config(); if(args != null && args.length > 1) { config.setNumWorkers(3); try { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("HotProductTopology", config, builder.createTopology()); Utils.sleep(30000); cluster.shutdown(); } } }
import java.util.Map;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import com.alibaba.fastjson.JSONObject;/** * 日志解析的bolt * @author Administrator * */public class LogParseBolt extends BaseRichBolt { private static final long serialVersionUID = -8017609899644290359L; private OutputCollector collector; @SuppressWarnings("rawtypes") public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String message = tuple.getStringByField("message"); JSONObject messageJSON = JSONObject.parseObject(message); JSONObject uriArgsJSON = messageJSON.getJSONObject("uri_args"); Long productId = uriArgsJSON.getLong("productId"); if(productId != null) { collector.emit(new Values(productId)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("productId")); }}
import java.util.ArrayList;import java.util.List;import java.util.Map;import org.apache.storm.shade.org.json.simple.JSONArray;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.trident.util.LRUMap;import org.apache.storm.tuple.Tuple;import org.apache.storm.utils.Utils;import com.roncoo.eshop.storm.zk.ZooKeeperSession;/** * 商品访问次数统计bolt * @author Administrator * */public class ProductCountBolt extends BaseRichBolt { private static final long serialVersionUID = -8761807561458126413L; private LRUMap
import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ArrayBlockingQueue;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;import org.apache.storm.utils.Utils;/** * kafka消费数据的spout */public class AccessLogKafkaSpout extends BaseRichSpout { private static final long serialVersionUID = 8698470299234327074L; private ArrayBlockingQueue
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;/** * ZooKeeperSession * @author Administrator * */public class ZooKeeperSession { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private ZooKeeper zookeeper; public ZooKeeperSession() { // 去连接zookeeper server,创建会话的时候,是异步去进行的 // 所以要给一个监听器,说告诉我们什么时候才是真正完成了跟zk server的连接 try { this.zookeeper = new ZooKeeper( "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 50000, new ZooKeeperWatcher()); // 给一个状态CONNECTING,连接中 System.out.println(zookeeper.getState()); try { // CountDownLatch // java多线程并发同步的一个工具类 // 会传递进去一些数字,比如说1,2 ,3 都可以 // 然后await(),如果数字不是0,那么久卡住,等待 // 其他的线程可以调用coutnDown(),减1 // 如果数字减到0,那么之前所有在await的线程,都会逃出阻塞的状态 // 继续向下运行 connectedSemaphore.await(); } catch(InterruptedException e) { e.printStackTrace(); } System.out.println("ZooKeeper session established......"); } catch (Exception e) { e.printStackTrace(); } } /** * 获取分布式锁 * @param productId */ public void acquireDistributedLock() { String path = "/taskid-list-lock"; try { zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success to acquire lock for taskid-list-lock"); } catch (Exception e) { // 如果那个商品对应的锁的node,已经存在了,就是已经被别人加锁了,那么就这里就会报错 // NodeExistsException int count = 0; while(true) { try { Thread.sleep(1000); zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception e2) { count++; System.out.println("the " + count + " times try to acquire lock for taskid-list-lock......"); continue; } System.out.println("success to acquire lock for taskid-list-lock after " + count + " times try......"); break; } } } /** * 释放掉一个分布式锁 * @param productId */ public void releaseDistributedLock() { String path = "/taskid-list-lock"; try { zookeeper.delete(path, -1); System.out.println("release the lock for taskid-list-lock......"); } catch (Exception e) { e.printStackTrace(); } } public String getNodeData() { try { return new String(zookeeper.getData("/taskid-list", false, new Stat())); } catch (Exception e) { e.printStackTrace(); } return ""; } public void setNodeData(String path, String data) { try { zookeeper.setData(path, data.getBytes(), -1); } catch (Exception e) { e.printStackTrace(); } } /** * 建立zk session的watcher * @author Administrator * */ private class ZooKeeperWatcher implements Watcher { public void process(WatchedEvent event) { System.out.println("Receive watched event: " + event.getState()); if(KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } } } /** * 封装单例的静态内部类 * @author Administrator * */ private static class Singleton { private static ZooKeeperSession instance; static { instance = new ZooKeeperSession(); } public static ZooKeeperSession getInstance() { return instance; } } /** * 获取单例 * @return */ public static ZooKeeperSession getInstance() { return Singleton.getInstance(); } /** * 初始化单例的便捷方法 */ public static void init() { getInstance(); } }
于双重zookeeper分布式锁完成分布式并行缓存预热:
1、服务启动的时候,进行缓存预热
2、从zk中读取taskid列表
3、依次遍历每个taskid,尝试获取分布式锁,如果获取不到,快速报错,不要等待,因为说明已经有其他服务实例在预热了
4、直接尝试获取下一个taskid的分布式锁
5、即使获取到了分布式锁,也要检查一下这个taskid的预热状态,如果已经被预热过了,就不再预热了
6、执行预热操作,遍历productid列表,查询数据,然后写ehcache和redis
7、预热完成后,设置taskid对应的预热状态
ZKsession重载两个方法:
/** * 获取分布式锁 * @param productId */ public boolean acquireFastFailedDistributedLock(String path) { try { zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success to acquire lock for " + path); return true; } catch (Exception e) { System.out.println("fail to acquire lock for " + path); } return false; }/** * 释放掉一个分布式锁 * @param productId */ public void releaseDistributedLock(String path) { try { zookeeper.delete(path, -1); System.out.println("release the lock for " + path + "......"); } catch (Exception e) { e.printStackTrace(); } }public String getNodeData(String path) { try { return new String(zookeeper.getData(path, false, new Stat())); } catch (Exception e) { e.printStackTrace(); } return ""; } public void setNodeData(String path, String data) { try { zookeeper.setData(path, data.getBytes(), -1); } catch (Exception e) { e.printStackTrace(); } }
/** * 获取分布式锁 */ public void acquireDistributedLock(String path) { try { zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success to acquire lock for " + path); } catch (Exception e) { // 如果那个商品对应的锁的node,已经存在了,就是已经被别人加锁了,那么就这里就会报错 // NodeExistsException int count = 0; while(true) { try { Thread.sleep(1000); zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception e2) { count++; System.out.println("the " + count + " times try to acquire lock for " + path + "......"); continue; } System.out.println("success to acquire lock for " + path + " after " + count + " times try......"); break; } } }
import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import com.roncoo.eshop.cache.model.ProductInfo;import com.roncoo.eshop.cache.service.CacheService;import com.roncoo.eshop.cache.spring.SpringContext;import com.roncoo.eshop.cache.zk.ZooKeeperSession;/** * 缓存预热线程 */public class CachePrewarmThread extends Thread { @Override public void run() { CacheService cacheService = (CacheService) SpringContext. getApplicationContext().getBean("cacheService"); ZooKeeperSession zkSession = ZooKeeperSession.getInstance(); // 获取storm taskid列表 String taskidList = zkSession.getNodeData("/taskid-list"); if(taskidList != null && !"".equals(taskidList)) { String[] taskidListSplited = taskidList.split(","); for(String taskid : taskidListSplited) { String taskidLockPath = "/taskid-lock-" + taskid; boolean result = zkSession.acquireFastFailedDistributedLock(taskidLockPath); if(!result) { continue; } String taskidStatusLockPath = "/taskid-status-lock-" + taskid; zkSession.acquireDistributedLock(taskidStatusLockPath); //检查越热的状态 String taskidStatus = zkSession.getNodeData("/taskid-status-" + taskid); if("".equals(taskidStatus)) { String productidList = zkSession.getNodeData("/task-hot-product-list-" + taskid); JSONArray productidJSONArray = JSONArray.parseArray(productidList); for(int i = 0; i < productidJSONArray.size(); i++) { Long productId = productidJSONArray.getLong(i); String productInfoJSON = "{\"id\": " + productId + ", \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-01-01 12:00:00\"}"; ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class); cacheService.saveProductInfo2LocalCache(productInfo); cacheService.saveProductInfo2ReidsCache(productInfo); } zkSession.setNodeData(taskidStatusLockPath, "success"); } zkSession.releaseDistributedLock(taskidStatusLockPath); zkSession.releaseDistributedLock(taskidLockPath); } } } }
import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import com.roncoo.eshop.cache.model.ProductInfo;import com.roncoo.eshop.cache.service.CacheService;import com.roncoo.eshop.cache.spring.SpringContext;import com.roncoo.eshop.cache.zk.ZooKeeperSession;/** * 缓存预热线程 */public class CachePrewarmThread extends Thread { @Override public void run() { CacheService cacheService = (CacheService) SpringContext. getApplicationContext().getBean("cacheService"); ZooKeeperSession zkSession = ZooKeeperSession.getInstance(); // 获取storm taskid列表 String taskidList = zkSession.getNodeData("/taskid-list"); if(taskidList != null && !"".equals(taskidList)) { String[] taskidListSplited = taskidList.split(","); for(String taskid : taskidListSplited) { String taskidLockPath = "/taskid-lock-" + taskid; boolean result = zkSession.acquireFastFailedDistributedLock(taskidLockPath); if(!result) { continue; } String taskidStatusLockPath = "/taskid-status-lock-" + taskid; zkSession.acquireDistributedLock(taskidStatusLockPath); //检查越热的状态 String taskidStatus = zkSession.getNodeData("/taskid-status-" + taskid); if("".equals(taskidStatus)) { String productidList = zkSession.getNodeData("/task-hot-product-list-" + taskid); JSONArray productidJSONArray = JSONArray.parseArray(productidList); for(int i = 0; i < productidJSONArray.size(); i++) { Long productId = productidJSONArray.getLong(i); String productInfoJSON = "{\"id\": " + productId + ", \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-01-01 12:00:00\"}"; ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class); cacheService.saveProductInfo2LocalCache(productInfo); cacheService.saveProductInfo2ReidsCache(productInfo); } zkSession.setNodeData(taskidStatusLockPath, "success"); } zkSession.releaseDistributedLock(taskidStatusLockPath); zkSession.releaseDistributedLock(taskidLockPath); } } } }
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~