JAVA并发基础

网友投稿 309 2022-09-04

JAVA并发基础

CPU多级缓存:

read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用

load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放到工作内存中的变量副本中

write(写入):作用于主内存的变量,他把store操作从工作内存中一个变量的值,传送到主内存的变量中

不允许一个线程丢弃它的最近assign的操作,即变量在工作内存中改变了之后必须同步到内存中

如果对一个变脸执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始变量的值。

可以阻塞线程,并保证线程在满足特定的条件下继续执行,线程执行完成之后,在进行其他的处理

阻塞进程,并同一时间控制请求的并发量,控制同时的并发数

添加几个注解:

import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.Target;import java.lang.annotation.RetentionPolicy;//用来标记【不推荐】的类或者写法@Target(ElementType.TYPE)@Retention(RetentionPolicy.SOURCE)public @interface NotRecommend { String value() default "";}

import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.Target;import java.lang.annotation.RetentionPolicy;//用来标记【线程安全】的类或者写法@Target(ElementType.TYPE)@Retention(RetentionPolicy.SOURCE)public @interface NotThreadSafe { String value() default "";}

import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.Target;import java.lang.annotation.RetentionPolicy;//用来标记【推荐】的类或者写法@Target(ElementType.TYPE)@Retention(RetentionPolicy.SOURCE)public @interface Recommend {}

import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.Target;import java.lang.annotation.RetentionPolicy;//用来标记【线程安全】的类或者写法@Target(ElementType.TYPE)@Retention(RetentionPolicy.SOURCE)public @interface ThreadSafe { String value() default "";}

线程不安全:

import com.example.annoations.NotThreadSafe;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;@Slf4j@NotThreadSafepublic class ConcurrencyTest { //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static int count=0; public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i{ try{ semaphore.acquire(); //是否允许被执行 add(); semaphore.release(); //释放信号量 }catch(Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}",count); } private static void add(){ count++; }}

线程安全性:

import com.example.annoations.ThreadSafe;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.atomic.AtomicInteger;@Slf4j@ThreadSafepublic class CountExample2 { //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static AtomicInteger count= new AtomicInteger(0); public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i{ try{ semaphore.acquire(); //是否允许被执行 add(); semaphore.release(); //释放信号量 }catch(Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}",count.get()); } private static void add(){ count.incrementAndGet(); }}

import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.atomic.AtomicLong;import com.example.annoations.ThreadSafe;import lombok.extern.slf4j.Slf4j;@Slf4j@ThreadSafepublic class AtomicExample2 { //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static AtomicLong count= new AtomicLong(0); public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i{ try{ semaphore.acquire(); //是否允许被执行 add(); semaphore.release(); //释放信号量 }catch(Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}",count.get()); } private static void add(){ count.incrementAndGet(); }}

import com.example.annoations.ThreadSafe;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.atomic.LongAdder;@Slf4j@ThreadSafepublic class AtomicExample6 { //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static LongAdder count= new LongAdder(); public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i{ try{ semaphore.acquire(); //是否允许被执行 add(); semaphore.release(); //释放信号量 }catch(Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}",count); } private static void add(){ count.increment(); }}

import com.example.annoations.ThreadSafe;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicReference;@Slf4j@ThreadSafepublic class AtomicExample7 { private static AtomicReference count=new AtomicReference<>(0); public static void main(String[] args) { count.compareAndSet(0,2); count.compareAndSet(0,1); count.compareAndSet(1,3); count.compareAndSet(2,4); count.compareAndSet(3,5); log.info("count:{}",count.get()); }}

import com.example.annoations.ThreadSafe;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;@Slf4j@ThreadSafepublic class AtomicExample8 { //更新指定的类的某个字段的值,字段用volatile 非static 描述的字段 private static AtomicIntegerFieldUpdater updater= AtomicIntegerFieldUpdater.newUpdater(AtomicExample8.class,"count"); @Getter public volatile int count=100; public static void main(String[] args) { AtomicExample8 example8=new AtomicExample8(); if(updater.compareAndSet(example8,100,200)){ log.info("update success 1,{}",example8.getCount()); } if(updater.compareAndSet(example8,100,200)){ log.info("update success 2,{}",example8.getCount()); } else{ log.info("update failed,{}",example8.getCount()); } }}

import com.example.annoations.ThreadSafe;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.atomic.AtomicBoolean;@Slf4j@ThreadSafepublic class AtomicExample9 { private static AtomicBoolean isHappened=new AtomicBoolean(false);//默认是是否发生 //请求总数 public static int clientTotal=5000; //同时并发执行的线程数 public static int threadTotal=200; public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); final Semaphore semaphore=new Semaphore(threadTotal); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); for(int i=0;i{ try{ semaphore.acquire(); //是否允许被执行 test(); semaphore.release(); //释放信号量 }catch(Exception e){ log.error("exception",e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("isHappened:{}",isHappened.get()); } private static void test(){ if(isHappened.compareAndSet(false,true)); log.info("execute"); }

同步锁。

import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;@Slf4jpublic class SynchronizedExample2 { //修饰一个类 public void test1(int j){ synchronized (this){ for (int i=0;i<10;i++){ log.info("test1{}-{}",j,i); } } } //修改一个静态方法 public static synchronized void test2(int j){ for (int i=0;i<10;i++){ log.info("test2{}-{}",j,i); } } public static void main(String[] args) { SynchronizedExample1 example1=new SynchronizedExample1(); SynchronizedExample1 example2=new SynchronizedExample1(); ExecutorService excutorService= Executors.newCachedThreadPool(); excutorService.execute(()->{ example1.test2(1); }); excutorService.execute(()->{ example2.test2(1); }); }}

原子性对比:

无法保证线程安全:(不具有原子性)

适用场景:作为状态标识量

线程安全性总结:

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:C#委托帮助类
下一篇:Mybaits入门使用
相关文章

 发表评论

暂时没有评论,来抢沙发吧~