小工具      在线工具  汉语词典  css  js  c++  java

高并发基础之Java并发包

Java 额外说明

收录于:42天前

转自:https://www.cnblogs.com/sessionbest/articles/8689297.html

摘要:本系列以炼金成金课程为基础。为了更好的学习,做了一系列的记录。本文主要介绍:1.各种同步控制工具的使用2.并发容器及典型源码分析

【高并发Java 2】多线程基础知识中,我们已经初步提到了基本的线程同步操作。这次要提到的是在并发包中的同步控制工具。

1. 各种同步控制工具的使用

1.1 ReentrantLock 

ReentrantLock 感觉像是synchronized 的增强版。 Synchronized的特点是使用方便,一切都交给JVM,但功能比较弱。在JDK1.5之前,ReentrantLock的性能比synchronized要好。由于JVM的优化,在目前的JDK版本中,两者的性能不相上下。如果是简单的实现,就不要刻意使用ReentrantLock。

与synchronized相比,ReentrantLock的功能更加丰富。它具有可重入、可中断、限时、公平锁等特点。

首先我们通过一个例子来说明ReentrantLock的初步使用:

package test;

import java.util.concurrent.locks.ReentrantLock;

public class Test implements Runnable
{
	public static ReentrantLock lock = new ReentrantLock();
	public static int i = 0;

	@Override
	public void run()
	{
		for (int j = 0; j < 10000000; j++)
		{
			lock.lock();
			try
			{
				i++;
			}
			finally
			{
				lock.unlock();
			}
		}
	}
	
	public static void main(String[] args) throws InterruptedException
	{
		Test test = new Test();
		Thread t1 = new Thread(test);
		Thread t2 = new Thread(test);
		t1.start();
		t2.start();
		t1.join();
		t2.join();
		System.out.println(i);
	}

}

有两个线程对i执行++操作。为了保证线程安全,使用了ReentrantLock。从用法可以看出,ReentrantLock比synchronized稍微复杂一些。因为解锁操作必须在finally中进行,如果不在finally中解锁,代码中可能会出现异常而锁没有释放,而synchronized是由JVM来释放锁的。

那么ReentrantLock有哪些优秀的特性呢?

1.1.1 可重入

单个线程可以重复进入,但必须重复退出

lock.lock();
lock.lock();
try
{
	i++;
			
}			
finally
{
	lock.unlock();
	lock.unlock();
}

由于ReentrantLock是可重入锁,因此可以重复获取同一个锁。它有一个与锁相关的获取计数器。如果拥有锁的线程再次获得锁,则获取计数器加1,然后需要释放锁。需要两次才能得到真正的释放(可重入锁)。这模仿了同步的语义;如果线程进入由该线程已经拥有的监视器保护的同步块,则允许该线程继续执行,并且当该线程退出第二个(或后续)同步块时,锁不会被释放,只有该线程退出该锁仅当进入受监视器保护的第一个同步块时才被释放。

public class Child extends Father implements Runnable{
    final static Child child = new Child();//为了保证锁唯一
    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            new Thread(child).start();
        }
    }
 
    public synchronized void doSomething() {
        System.out.println("1child.doSomething()");
        doAnotherThing(); // 调用自己类中其他的synchronized方法
    }
 
    private synchronized void doAnotherThing() {
        super.doSomething(); // 调用父类的synchronized方法
        System.out.println("3child.doAnotherThing()");
    }
 
    @Override
    public void run() {
        child.doSomething();
    }
}
class Father {
    public synchronized void doSomething() {
        System.out.println("2father.doSomething()");
    }
}

我们可以看到,当一个线程进入不同的synchronized方法时,它不会释放之前获得的锁。所以输出仍然是顺序的。所以synchronized也是一个可重入锁

输出:

1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
...

1.1.2.可中断

与synchronized不同的是,ReentrantLock对中断是有响应的。中断相关知识查看【高并发Java 2】多线程基础知识

普通lock.lock()无法响应中断,但是lock.lockInterruptically()可以响应中断。

我们模拟一个死锁场景,然后使用中断来处理死锁

package test;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.locks.ReentrantLock;

public class Test implements Runnable
{
	public static ReentrantLock lock1 = new ReentrantLock();
	public static ReentrantLock lock2 = new ReentrantLock();

	int lock;

	public Test(int lock)
	{
		this.lock = lock;
	}

	@Override
	public void run()
	{
		try
		{
			if (lock == 1)
			{
				lock1.lockInterruptibly();
				try
				{
					Thread.sleep(500);
				}
				catch (Exception e)
				{
					// TODO: handle exception
				}
				lock2.lockInterruptibly();
			}
			else
			{
				lock2.lockInterruptibly();
				try
				{
					Thread.sleep(500);
				}
				catch (Exception e)
				{
					// TODO: handle exception
				}
				lock1.lockInterruptibly();
			}
		}
		catch (Exception e)
		{
			// TODO: handle exception
		}
		finally
		{
			if (lock1.isHeldByCurrentThread())
			{
				lock1.unlock();
			}
			if (lock2.isHeldByCurrentThread())
			{
				lock2.unlock();
			}
			System.out.println(Thread.currentThread().getId() + ":线程退出");
		}
	}

	public static void main(String[] args) throws InterruptedException
	{
		Test t1 = new Test(1);
		Test t2 = new Test(2);
		Thread thread1 = new Thread(t1);
		Thread thread2 = new Thread(t2);
		thread1.start();
		thread2.start();
		Thread.sleep(1000);
		//DeadlockChecker.check();
	}

	static class DeadlockChecker
	{
		private final static ThreadMXBean mbean = ManagementFactory
				.getThreadMXBean();
		final static Runnable deadlockChecker = new Runnable()
		{
			@Override
			public void run()
			{
				// TODO Auto-generated method stub
				while (true)
				{
					long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
					if (deadlockedThreadIds != null)
					{
						ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds);
						for (Thread t : Thread.getAllStackTraces().keySet())
						{
							for (int i = 0; i < threadInfos.length; i++)
							{
								if(t.getId() == threadInfos[i].getThreadId())
								{
									t.interrupt();
								}
							}
						}
					}
					try
					{
						Thread.sleep(5000);
					}
					catch (Exception e)
					{
						// TODO: handle exception
					}
				}

			}
		};
		
		public static void check()
		{
			Thread t = new Thread(deadlockChecker);
			t.setDaemon(true);
			t.start();
		}
	}

}

上面的代码可能会导致死锁。线程1获得lock1,线程2获得lock2,然后每个线程都想获得对方的锁。

我们使用jstack查看运行上面代码后的情况

确实发现了僵局。

DeadlockChecker.check();方法用于检测死锁,然后中断死锁线程。中断后,线程正常退出。

1.1.3.时间有限

如果超时后无法获取锁,则返回false,不会出现永远等待导致的死锁。

使用lock.tryLock(long timeout, TimeUnit unit)实现限时锁。参数是时间和单位。

举个例子来说明时间限制:

package test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Test implements Runnable
{
	public static ReentrantLock lock = new ReentrantLock();

	@Override
	public void run()
	{
		try
		{
			if (lock.tryLock(5, TimeUnit.SECONDS))
			{
				Thread.sleep(6000);
			}
			else
			{
				System.out.println("get lock failed");
			}
		}
		catch (Exception e)
		{
		}
		finally
		{
			if (lock.isHeldByCurrentThread())
			{
				lock.unlock();
			}
		}
	}
	
	public static void main(String[] args)
	{
		Test t = new Test();
		Thread t1 = new Thread(t);
		Thread t2 = new Thread(t);
		t1.start();
		t2.start();
	}

}

使用两个线程来竞争锁。当一个线程获得锁时,它会休眠6秒,每个线程只尝试5秒来获得锁。

所以肯定有一个线程无法获取锁。如果你拿不到,你就放弃。

输出:

get lock failed

1.1.4.公平锁

如何使用:

public ReentrantLock(boolean fair) 

public static ReentrantLock fairLock = new ReentrantLock(true);

一般意义上的锁是不公平的。先到达的线程不一定先获得锁,后到达的线程最后获得锁。不公平的锁可能会导致饥饿。

公平锁的含义就是这个锁可以保证线程先来,先拿到锁。虽然公平锁不会导致饥饿,但是公平锁的性能会比非公平锁差很多。

1.2 Condition

Condition和ReentrantLock的关系类似于synchronized和Object.wait()/signal()

await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,线 程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。

awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。 singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。这和Obejct.notify()方法很类似。

这里就不详细介绍了。举个例子来说明:

package test;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Test implements Runnable
{
	public static ReentrantLock lock = new ReentrantLock();
	public static Condition condition = lock.newCondition();

	@Override
	public void run()
	{
		try
		{
			lock.lock();
			condition.await();
			System.out.println("Thread is going on");
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
		finally
		{
			lock.unlock();
		}
	}
	
	public static void main(String[] args) throws InterruptedException
	{
		Test t = new Test();
		Thread thread = new Thread(t);
		thread.start();
		Thread.sleep(2000);
		
		lock.lock();
		condition.signal();
		lock.unlock();
	}

}

上面的例子很简单,让一个线程await并让主线程唤醒它。 Condition.await()/signal只能在获得锁后使用。

1.3.Semaphore

对于锁来说,是互斥的。这意味着一旦我获得了锁,任何人都无法再次获得它。

对于Semaphore来说,它允许多个线程同时进入临界区。可以认为是共享锁,但是共享配额是有限的。当配额用完后,其他未获得配额的线程仍然会被阻塞在临界区之外。当金额为1时,等于锁定

这是一个例子:

package test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


public class Test implements Runnable
{
	final Semaphore semaphore = new Semaphore(5);
	@Override
	public void run()
	{
		try
		{
			semaphore.acquire();
			Thread.sleep(2000);
			System.out.println(Thread.currentThread().getId() + " done");
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}finally {
			semaphore.release();
		}
	}
	
	public static void main(String[] args) throws InterruptedException
	{
		ExecutorService executorService = Executors.newFixedThreadPool(20);
		final Test t = new Test();
		for (int i = 0; i < 20; i++)
		{
			executorService.submit(t);
		}
	}

}

有一个20个线程的线程池,每个线程都去Semaphore的权限。只有 5 个信号量权限。运行后可以看到批量输出了5个。

当然一个线程也可以同时申请多个权限

public void acquire(int permits) throws InterruptedException

1.4 ReadWriteLock

ReadWriteLock是一种功能区分锁。读和写是两种不同的功能。读与读不互斥,读与写互斥,写与写互斥。

这样的设计增加了并发量,保证了数据安全。

如何使用:

private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); 
private static Lock readLock = readWriteLock.readLock(); 
private static Lock writeLock = readWriteLock.writeLock();

详细例子可以查看 Java实现生产者-消费者问题和读者-写入者问题,这里就不展开了。

1.5 CountDownLatch

倒数计时器
一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。 只有等所有检查完毕后,引擎才能点火。这种场景就非常适合使用CountDownLatch。它可以使得点火线程
,等待所有检查线程全部完工后,再执行

如何使用:

static final CountDownLatch end = new CountDownLatch(10);
end.countDown(); 
end.await();

原理图,示意图:

一个简单的例子:

package test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test implements Runnable
{
	static final CountDownLatch countDownLatch = new CountDownLatch(10);
	static final Test t = new Test();
	@Override
	public void run()
	{
		try
		{
			Thread.sleep(2000);
			System.out.println("complete");
			countDownLatch.countDown();
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) throws InterruptedException
	{
		ExecutorService executorService = Executors.newFixedThreadPool(10);
		for (int i = 0; i < 10; i++)
		{
			executorService.execute(t);
		}
		countDownLatch.await();
		System.out.println("end");
		executorService.shutdown();
	}

}

主线程必须等待所有10个线程执行完毕才能输出“end”。

1.6 CyclicBarrier

与 CountDownLatch 类似,它也会等待某些线程完成后再执行。与CountDownLatch的不同之处在于该计数器可以重复使用。例如,假设我们将计数器设置为10。那么在收集第一批10个线程后,计数器将返回到零,然后将收集下一批10个线程。

如何使用:

public CyclicBarrier(int parties, Runnable barrierAction) 

barrierAction就是当计数器一次计数完成后,系统会执行的动作

await()

原理图,示意图:

这是一个例子:

package test;

import java.util.concurrent.CyclicBarrier;

public class Test implements Runnable
{
	private String soldier;
	private final CyclicBarrier cyclic;

	public Test(String soldier, CyclicBarrier cyclic)
	{
		this.soldier = soldier;
		this.cyclic = cyclic;
	}

	@Override
	public void run()
	{
		try
		{
			//等待所有士兵到齐
			cyclic.await();
			dowork();
			//等待所有士兵完成工作
			cyclic.await();
		}
		catch (Exception e)
		{
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	private void dowork()
	{
		// TODO Auto-generated method stub
		try
		{
			Thread.sleep(3000);
		}
		catch (Exception e)
		{
			// TODO: handle exception
		}
		System.out.println(soldier + ": done");
	}

	public static class BarrierRun implements Runnable
	{

		boolean flag;
		int n;

		public BarrierRun(boolean flag, int n)
		{
			super();
			this.flag = flag;
			this.n = n;
		}

		@Override
		public void run()
		{
			if (flag)
			{
				System.out.println(n + "个任务完成");
			}
			else
			{
				System.out.println(n + "个集合完成");
				flag = true;
			}

		}

	}

	public static void main(String[] args)
	{
		final int n = 10;
		Thread[] threads = new Thread[n];
		boolean flag = false;
		CyclicBarrier barrier = new CyclicBarrier(n, new BarrierRun(flag, n));
		System.out.println("集合");
		for (int i = 0; i < n; i++)
		{
			System.out.println(i + "报道");
			threads[i] = new Thread(new Test("士兵" + i, barrier));
			threads[i].start();
		}
	}

}

打印结果:

集合
0报道
1报道
2报道
3报道
4报道
5报道
6报道
7报道
8报道
9报道
10个集合完成
士兵5: done
士兵7: done
士兵8: done
士兵3: done
士兵4: done
士兵1: done
士兵6: done
士兵2: done
士兵0: done
士兵9: done
10个任务完成

1.7 LockSupport

提供线程阻塞原语

类似于暂停

LockSupport.park(); 
LockSupport.unpark(t1);

与 suspend 相比,不太可能导致线程冻结

LockSupport的思想有点类似于Semaphore。有内部许可。停车时取消该权限,出车时申请该权限。因此,如果 unpark 先于 Park,则不会发生线程冻结。

下面的代码是【高并发Java 2】多线程基础知识中suspend示例代码,在使用suspend时会发生死锁。

package test;

import java.util.concurrent.locks.LockSupport;
 
public class Test
{
    static Object u = new Object();
    static TestSuspendThread t1 = new TestSuspendThread("t1");
    static TestSuspendThread t2 = new TestSuspendThread("t2");
 
    public static class TestSuspendThread extends Thread
    {
        public TestSuspendThread(String name)
        {
            setName(name);
        }
 
        @Override
        public void run()
        {
            synchronized (u)
            {
                System.out.println("in " + getName());
                //Thread.currentThread().suspend();
                LockSupport.park();
            }
        }
    }
 
    public static void main(String[] args) throws InterruptedException
    {
        t1.start();
        Thread.sleep(100);
        t2.start();
//        t1.resume();
//        t2.resume();
        LockSupport.unpark(t1);
        LockSupport.unpark(t2);
        t1.join();
        t2.join();
    }
}

使用LockSupport不会导致死锁。

此外

Park() 可以响应中断但不会抛出异常。中断响应的结果是park()函数的返回,中断标志可以从Thread.interrupted()中获取。

Park在JDK中被大量使用。当然LockSupport的实现也是使用unsafe.park()来实现的。

public static void park() {
        unsafe.park(false, 0L);
    }

1.8 ReentrantLock 的实现

下面介绍一下ReentrantLock的实现。 ReentrantLock的实现主要由三部分组成:

  • CAS 状态
  • 等待队列
  • 公园()

ReentrantLock的父类中会有一个state变量来表示同步状态。

/**
     * The synchronization state.
     */
    private volatile int state;

使用CAS操作设置状态来获取锁。如果设置为1,则将锁持有者交给当前线程。

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

如果锁定不成功,则会提出申请。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

首先尝试再次申请tryAcquire,因为此时可能有另一个线程释放了锁。

如果仍然没有申请锁,addWaiter表示将自己添加到等待队列中。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

在此期间,会出现多次尝试申请锁的情况。如果申请仍然失败,申请将被暂停。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

同理,如果在解锁操作的过程中,将锁释放,然后又unparked,这里就不详细讨论了。

2. 并发容器及典型源码分析

2.1 ConcurrentHashMap 

我们知道HashMap不是线程安全的容器。使 HashMap 线程安全的最简单方法是使用 Collections.synchronizedMap,它是 HashMap 的包装器。

public static Map m=Collections.synchronizedMap(new HashMap());

同样对于List,Set也提供了类似的方法。

不过这种方法只适用于并发量比较小的情况。

我们看一下synchronizedMap的实现

private final Map<K,V> m;     // Backing Map
        final Object      mutex;        // Object on which to synchronize

        SynchronizedMap(Map<K,V> m) {
            if (m==null)
                throw new NullPointerException();
            this.m = m;
            mutex = this;
        }

        SynchronizedMap(Map<K,V> m, Object mutex) {
            this.m = m;
            this.mutex = mutex;
        }

        public int size() {
            synchronized (mutex) {return m.size();}
        }
        public boolean isEmpty() {
            synchronized (mutex) {return m.isEmpty();}
        }
        public boolean containsKey(Object key) {
            synchronized (mutex) {return m.containsKey(key);}
        }
        public boolean containsValue(Object value) {
            synchronized (mutex) {return m.containsValue(value);}
        }
        public V get(Object key) {
            synchronized (mutex) {return m.get(key);}
        }

        public V put(K key, V value) {
            synchronized (mutex) {return m.put(key, value);}
        }
        public V remove(Object key) {
            synchronized (mutex) {return m.remove(key);}
        }
        public void putAll(Map<? extends K, ? extends V> map) {
            synchronized (mutex) {m.putAll(map);}
        }
        public void clear() {
            synchronized (mutex) {m.clear();}
        }

它将HashMap包装在里面,然后给HashMap的每个操作添加synchronized。

由于每个方法都获取相同的锁(互斥量),这意味着 put 和 remove 等操作是互斥的,大大减少了并发量。

我们看一下ConcurrentHashMap是如何实现的

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

ConcurrentHashMap内部有一个Segment段,它将大HashMap分成若干段(小HashMap),然后让数据在每个段上进行散列。这样,多个线程在不同段上的Hash操作必须是线程安全的。所以只需要同步同一个段上的线程,就实现了锁分离,大大提高了并发量。

使用ConcurrentHashMap.size的时候会比较麻烦,因为需要统计每个Segment的数据总和。这时必须对每个段加锁,然后才能进行数据统计。这是分离锁后的一个小缺点,但是size方法不应该被频繁调用。

在实现上,我们尝试使用trylock来代替synchronized和lock.lock。同时我们在HashMap的实现上也做了一些优化。这里我就不提了。

2.2 BlockingQueue

BlockingQueue 不是一个高性能容器。但它是一个非常好的共享数据的容器。这是一个典型的生产者和消费者的实现。

原理图,示意图:

. . .

相关推荐

额外说明

MySQL5.7快速搭建不踩坑 | 配置远程访问

- 作者 :“大数据小禅” - 简介: MySQL5.7数据库快速安装 | 不踩坑 - 简介:对应的依赖包可以通过最下方公众号私信我获取~ 1.MySQL简介 MySQL 是一款安全、跨平台、高效的,并与 PHP、Java 等主流编程语言紧密结合的数据库

额外说明

Oracle 数据泵迁移用户创建 SQL语句

在进行数据泵迁移时,通常是按照用户进行导入导出,因此需要确认当前数据库中存在那些非系统用户! 查看数据库中用户状态为 OPEN 的用户: select username,account_status,created,PROFILE from dba_us

额外说明

数据可视化——如何绘制地图

文章目录 前言 如何绘制地图 添加配置项 根据已有数据绘制地图 整体代码展示 前言 前面我们学习了如何利用提供的数据来对数据进行处理,然后以折线图的形式展现出来,那么今天我将为大家分享如何将提数据以地图的形式展现。 如何绘制地图 前面我们绘制折线图需要用

额外说明

引发0xC0000005内存违例几种可能原因分析

目录 1、概述 2、空指针访问 3、已释放内存的访问 4、内存越界 5、总结

额外说明

C++程序调用开源libcurl库实现SMTP发送邮件的功能

1、本文给出封装的C++类,头文件如下: #pragma once #include <string> #include <vector> #define SKIP_PEER_VERIFICATION #define SKIP_HOS

额外说明

jna 简介、中文文档、中英对照文档 下载

jna 文档 下载链接(含jar包、源码、pom) 组件名称 中英对照-文档-下载链接 中文-文档-下载链接 jna-4.1.0.jar 暂无 jna-4.1.0-API文档-中文版.zip jna-4.3.0.jar jna-4.3.0-API文档-中

额外说明

spring.net 的应用

using System; using System.Collections.Generic; using System.Linq; using System.Web; /// <summary> /// Summary description

额外说明

安装SVN提示please wait while the installer finishes determining your disk space requirements

解决方案1: 我的电脑 --> 启动任务管理器 --> 找到VisualSVN Server,右键 "Stop" 停掉该服务;然后重新安装,问题解决! 解决方案2: XP 下装了360的话,也可能出现此问题,把360有关开启的服务全部关闭,重新安装就好

额外说明

如何从WordPress循环中排除粘性帖子

置顶帖 are a great way to highlight your featured content. However, there are certain places on your website where you don’t need

ads via 小工具