线程池

83次阅读

共计 912 个字符,预计需要花费 3 分钟才能阅读完成。

《别再乱改数据库连接池的大小了》


解释了数据库连接池。
Tomcat 设置过大的数据库连接池:
Tomcat 同时向数据库并发很多连接,会导致大量请求连接到数据库,数据库主机任务很多,CPU 频繁上下文切换,导致查询耗时增加。

而应用程序使用连接池的用途就是,减少应用程序(Tomcat)本身创建连接和销毁的损耗。


数据库主机瓶颈:CPU、磁盘 IO、网络 IO、内存

数据库连接池即为 cpu 同时处理连接的数据量。连接数 = ((核心数 * 2) + 有效磁盘数)
高并发业务,需要的是一个小连接池和一个等待连接的队列。
如果,场景很复杂。比如:
系统同时混合了长事务和短事务,这时,根据上面的公式来计算就很难办了。正确的做法应该是创建两个连接池,一个服务于长事务,一个服务于 ” 实时 ” 查询,也就是短事务。
还有一种情况,比方说一个系统执行一个任务队列,业务上要求同一时间内只允许执行一定数量的任务,这时,我们就应该让并发任务数去适配连接池连接数,而不是连接数大小去适配并发任务数。

对于容器来说,
docker 默认设置下,所有容器可以平等地使用 host CPU 资源并且没有限制。- c 只是改变容器在资源紧张情况下的优先级。
K8S CPU 限制,是限制容器占用 CPU 的时间,爆发后可以多占用 CPU 时间片。

进程是操作系统进行资源(包括 cpu、内存、磁盘 IO 等)分配的最小单位。
线程是 cpu 调度和分配的基本单位。


网站并发和 TPS

系统吞度量要素:
系统吞吐量几个重要參数:QPS,TPS、并发数、响应时间(RT)

QPS:每秒钟 request 数量
TPS:每秒钟事务数量

一般 TPS 是对整个系统来讲的。一个应用系统 1s 能完成多少事务处理,一个事务在分布式处理中,可能会对应多个请求,对于衡量单个接口服务的处理能力,用 QPS 比较多。

吞吐量: 吞吐量是指系统在单位时间内处理请求的数量。
并发数:系统同一时候处理的 request/ 事务数
响应时间:一般取平均响应时间

nginx 每一次成功的请求,记录一条日志。所以统计 nginx 日志条数,应该是统计的请求量,统计一分钟请求数 /60=QPS。

理解了上面三个要素的意义之后,就能推算出它们之间的关系:
QPS = 并发数 / 平均响应时间
或者
并发数 = QPS* 平均响应时间

正文完
 0

线程池

86次阅读

共计 5786 个字符,预计需要花费 15 分钟才能阅读完成。

Java 多线程

线程的同步是 Java 多线程编程的重点和难点,往往让人搞不清楚什么是竞争资源、什么时候需要考虑同步,怎么同步等等问题,当然,这些问题没有很明确的答案,但有些原则问题需要考虑,是否有竞争资源被同时改动的问题?对于同步,在具体的 Java 代码中需要完成以下两个操作:把竞争访问的资源标识为 private;同步哪些修改变量的代码,使用 synchronized 关键字同步方法或代码。当然这不是唯一控制并发安全的途径。synchronized 关键字使用说明 synchronized 只能标记非抽象的方法,不能标识成员变量。

工作原理

线程是进程中的实体,一个进程可以拥有多个线程,一个线程必须有一个父进程。线程不拥有系统资源,只有运行必须的一些数据结构;它与父进程的其它线程共享该进程所拥有的全部资源。线程可以创建和撤消线程,从而实现程序的并发执行。一般,线程具有就绪、阻塞和运行三种基本状态。

在多中央处理器的系统里,不同线程可以同时在不同的中央处理器上运行,甚至当它们属于同一个进程时也是如此。大多数支持多处理器的操作系统都提供编程接口来让进程可以控制自己的线程与各处理器之间的关联度(affinity)。

有时候,线程也称作轻量级进程。就象进程一样,线程在程序中是独立的、并发的执行路径,每个线程有它自己的堆栈、自己的程序计数器和自己的局部变量。但是,与分隔的进程相比,进程中的线程之间的隔离程度要小。它们共享内存、文件句柄和其它每个进程应有的状态。

进程可以支持多个线程,它们看似同时执行,但互相之间并不同步。一个进程中的多个线程共享相同的内存地址空间,这就意味着它们可以访问相同的变量和对象,而且它们从同一堆中分配对象。尽管这让线程之间共享信息变得更容易,但您必须小心,确保它们不会妨碍同一进程里的其它线程。

Java 线程工具和 API 看似简单。但是,编写有效使用线程的复杂程序并不十分容易。因为有多个线程共存在相同的内存空间中并共享相同的变量,所以您必须小心,确保您的线程不会互相干扰。

线程属性

为了正确有效地使用线程,必须理解线程的各个方面并了解 Java 实时系统。必须知道如何提供线程体、线程的生命周期、实时系统如 何调度线程、线程组、什么是幽灵线程(Demo nThread)。

线程体

所有的操作都发生在线程体中,在 Java 中线程体是从 Thread 类继承的 run()方法,或实现 Runnable 接口的类中的 run() 方法。当线程产生并初始化后,实时系统调用它的 run() 方法。run()方法内的代码实现所产生线程的行为,它是线程的主要部分。

线程状态


表示了线程在它的生命周期内的任何时刻所能处的状态以及引起状态改变的方法。这图并不是完整的有限状态图,但基本概括了线程中比较感兴趣和普遍的方面。以下讨论有关线程生命周期以此为据。

●New Thread
产生一个 Thread 对象就生成一个新线程。当线程处于 ” 新线程 ” 状态时,仅仅是一个空线程对象,它还没有分配到系统资源。因此只能启动或终止它。任何其他操作都会引发异常。例如,一个线程调用了 new 方法之后,并在调用 start 方法之前的处于新线程状态,可以调用 start 和 stop 方法。

●Runnable
start()方法产生运行线程所必须的资源,调度线程执行,并且调用线程的 run() 方法。在这时线程处于可运行态。该状态不称为运行态是因为这时的线程并不总是一直占用处理机。特别是对于只有一个处理机的 PC 而言,任何时刻只能有一个处于可运行态的线程占用处理 机。Java 通过调度来实现多线程对处理机的共享。注意,如果线程处于 Runnable 状态,它也有可能不在运行,这是因为还有优先级和调度问题。

●Blocked
当以下事件发生时,线程进入非运行态。
①suspend()方法被调用;
②sleep()方法被调用;
③线程使用 wait()来等待条件变量
④线程处于 I / O 请求的等待。

●Dead
当 run()方法返回,或别的线程调用 stop()方法,线程进入死亡态。通常 Applet 使用它的 stop()方法来终止它产生的所有线程。

线程的本操作:

  • 派生:线程在进程内派生出来,它即可由进程派生,也可由线程派生。
  • 阻塞(Block):如果一个线程在执行过程中需要等待某个事件发生,则被阻塞。
  • 激活(unblock):如果阻塞线程的事件发生,则该线程被激活并进入就绪队列。
  • 调度(schedule):选择一个就绪线程进入执行状态。
  • 结束(Finish):如果一个线程执行结束,它的寄存器上下文以及堆栈内容等将被释放。
线程池的分类
  • FixedThreadPool

ExecutorsnewFixedThreadPool方法创建。它是一种线程数量固定的线程池,当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。FixedThreadPool只有核心线程,且该核心线程都不会被回收,这意味着它可以更快地响应外界的请求

  • CachedThreadPool

ExecutorsnewCachedThreadPool方法创建,不存在核心线程,只存在数量不定的非核心线程,而且其数量最大值为 Integer.MAX_VALUE。当线程池中的线程都处于活动时(全满), 线程池会创建新的线程来处理新的任务,否则就会利用新的线程来处理新的任务,线程池中的空闲线程都有超时机制,默认超时时长为 60s,超过 60s 的空闲线程就会被回收。和 FixedThreadPool 不同的是,CachedThreadPool 的任务队列其实相当于一个空的集合,这将导致任何任务都会被执行,因为在这种场景下 SynchronousQueue 是不能插入任务的,SynchronousQueue是一个特殊的队列,在很多情况下可以理解为一个无法储存元素的队列。从 CachedThreadPool 的特性看,这类线程比较适合执行大量耗时较小的任务。当整个线程池都处于闲置状态时,线程池中的线程都会因为超时而被停止回收,几乎是不占任何系统资源。

  • ScheduledThreadPool

通过 ExecutorsnewScheduledThreadPool方式创建,核心线程数量是固定的,而非核心线程是没有限制的,并且当非核心线程闲置时它会被立即回收,ScheduledThreadPool这类线程池主要用于执行定时任务和具有固定时期的重复任务

  • SingleThreadExecutor

通过 ExecutorsnewSingleThreadExecutor方法来创建。这类线程池内部只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行。SingleThreadExecutor的意义在于统一所有外界任务一个线程中,这使得这些任务之间不需要处理线程同步的问题,

线程池的调度策略

线程的另一个执行特性是同步。线程中所使用的同步控制机制与进程中所使用的同步控制机制相同。

ThreadPoolExecutor 参数详解

ThreadPoolExecutor 有多个构造函数,这里我们详细展开一个参数最多的展开。

new ThreadPoolExecutor(
            CORE_THREAD_COUNT,
            MAX_THREAD_COUNT,
            KEEP_ALIVE,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(),
            threadFactory
);
  • CORE_THREAD_COUNT
    池中所保存的线程数,包括空闲线程。需要注意的是在初创建线程池时线程不会立即启动,直到有任务提交才开始启动线程并逐渐时线程数目达到CORE_THREAD_COUNT。若想一开始就创建所有核心线程需调用 prestartAllCoreThreads 方法。
  • MAX_THREAD_COUNT- 池中允许的最大线程数。需要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否创建新线程。
  • KEEP_ALIVE
    当线程数大于核心时,多于的空闲线程最多存活时间
  • TimeUnit.SECONDS

keepAliveTime 参数的时间单位。

  • LinkedBlockingQueue 阻塞队列(线程安全)后面会详细介绍
  • ThreadFactory – 执行程序创建新线程时使用的工厂。
LinkedBlockingQueue
    1. LinkedBlockingQueue 不允许元素为 null,这一点在构造方法中也说过了。
    1. 同一时刻,只能有一个线程执行入队操作,因为 putLock 在将元素插入到队列尾部时加锁了
    1. 如果队列满了,那么将会调用 notFull 的 await()方法将该线程加入到 Condition 等待队列中。await()方法就会释放线程占有的锁,这将导致之前由于被锁阻塞的入队线程将会获取到锁,执行到 while 循环处,不过可能因为由于队列仍旧是满的,也被加入到条件队列中。
    1. 一旦一个出队线程取走了一个元素,并通知了入队等待队列中可以释放线程了,那么第一个加入到 Condition 队列中的将会被释放,那么该线程将会重新获得 put 锁,继而执行 enqueue()方法,将节点插入到队列的尾部
    1. 然后得到插入一个节点之前的元素个数,如果队列中还有空间可以插入,那么就通知 notFull 条件的等待队列中的线程。
    1. 通知出队线程队列为空了,因为插入一个元素之前的个数为 0,而插入一个之后队列中的元素就从无变成了有,就可以通知因队列为空而阻塞的出队线程了。
Handler

阻塞队列已满且线程数达到最大值时所采取的饱和策略。java 默认提供了 4 种饱和策略的实现方式:中止、抛弃、抛弃最旧的、调用者运行。

  • 1.Abort策略

默认策略,新任务提交时直接抛出未检查的异常 RejectedExecutionException,该异常可由调用者捕获。

  • 2.CallerRuns策略:为调节机制,既不抛弃任务也不抛出异常,而是将某些任务回退到调用者。不会在线程池的线程中执行新的任务,而是在调用 exector 的线程中运行新的任务。
  • 3.Discard策略:新提交的任务被抛弃。
  • 4.DiscardOldest策略:队列的是“队头”的任务,然后尝试提交新的任务。(不适合工作队列为优先队列场景)
ThreadPoolExecutor 实例

在使用线程池之前我们需要预先定义线程池配置信息,在此之前我们已经详细解释了创建线程池各个参数的意义,这里给出一个比较常规的配置方案。

CPU_COUNT=Runtime.getRuntime().availableProcessors();
CORE_THREAD_COUNT=CPU_COUNT+1;
MAX_THREAD_COUNT=CPU_COUNT*2+1;

之后需要创建一个静态工厂并是实现 newThread(Runnable r) 方法用于创建新线程。其中 AtomicInteger 是线程安全的整型变量用与区分线程号。

private static final ThreadFactory threadFactory=new ThreadFactory() {private AtomicInteger atomicInteger=new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {return new Thread(r,"Thread#"+atomicInteger.getAndIncrement());
        }
    };

测试代码

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {private final static int CPU_COUNT=Runtime.getRuntime().availableProcessors();
    private final static int CORE_THREAD_COUNT=CPU_COUNT+1;
    private final static int MAX_THREAD_COUNT=CPU_COUNT*2+1;
    private final static long KEEP_ALIVE=10l;
    private static final ThreadFactory threadFactory=new ThreadFactory() {private AtomicInteger atomicInteger=new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {return new Thread(r,"Thread#"+atomicInteger.getAndIncrement());
        }
    };
    private static final ThreadPoolExecutor threadPollExecutor=new ThreadPoolExecutor(
            CORE_THREAD_COUNT,
            MAX_THREAD_COUNT,
            KEEP_ALIVE,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(),
            threadFactory);
    public static void main(String[] args) {for (int i=0;i<5;i++){Runnable runnable=new Runnable() {
                @Override
                public void run() {
                    int index=0;
                    while (true){System.out.println(Thread.currentThread().getName()+":"+index++);
                    }
                }
            };
            threadPollExecutor.execute(runnable);
        }
    }
}

正文完
 0