关于编程:线程Ⅱ

41次阅读

共计 9186 个字符,预计需要花费 23 分钟才能阅读完成。

线程池
为什么要应用线程池?

创立和销毁线程是一个低廉的操作,要消耗大量的工夫。太多的线程还会节约内存资源,因为操作系统必须调度可运行的线程并执行上下文切换,所以太多的线程还会影响性能,而线程池能够改善这些状况。

线程池是什么?

能够将线程池设想成为能够由应用程序应用的一个线程汇合,每个 CLR 一个线程池,这个线程池由 CLR 管制的所有 AppDomain 共享。如果一个过程中加载了多个 CLR,那么每个 CLR 都有本人的线程池。

线程池是如何工作的?

CLR 初始化时,线程池中是没有线程的。在外部,线程池保护了一个操作申请队列,在应用程序想执行一个异步操作时,就调用一个办法,将一个记录项追加到线程池的队列中。线程池的代码从这个队列中提取记录项,将这个记录项差遣给一个线程池线程。此时如果线程池中没有线程,会创立一个新的线程(产生肯定的性能开销)。当线程实现工作后,线程不会被销毁,而会回到线程池且进入闲暇状态,期待响应下一个申请。因为线程不再销毁所以不再产生额定的性能损失。

如果应用程序向线程收回许多申请,线程池会尝试只用已有一个线程解决这些申请。如果发送申请的速度超过了线程池线程解决的速度,就会创立额定的线程来解决。

当应用程序进行向线程池发送求情,此时线程池中的线程什么都不做,造成内存资源的节约。所以有一个机制:当线程池线程闲置一段时间后(不同版本的 CLR 对工夫有所差别),线程会主动唤醒并终止本人开释资源。

特点

线程池能够包容大量线程防止资源节约,也能够创立大量线程充分利用多处理器、超线程处理器以及多核处理器。换句话说线程池是启发式的,如果应用程序要执行许多工作同时又有可用的 CPU,那么线程池会创立更多的线程。

应用线程池异步编程
private static void SomeMethod(object state)
{
    // 办法由线程池线程执行
    Console.WriteLine("state = {0}", state);
    Thread.Sleep(10000);

    // 办法返回后线程回到线程池期待下一个申请
}

static void Main()
{ThreadPool.QueueUserWorkItem(SomeMethod, 1);
    Console.ReadKey();}
执行上下文

每个线程都关联了一个执行上下文数据结构,蕴含有:平安设置、宿主设置以及逻辑调用上下文数据。失常状况下,每当一个线程 (初始线程) 应用另一个线程 (辅助线程) 执行工作时,前者的执行上下文应该复制到辅助线程,这样能够确保辅助线程的任何操作都是应用同样的平安设置和宿主设置,还能保障初始线程的逻辑调用上下文能够在辅助线程中应用。默认状况下初始线程的执行上下文能够流向任何辅助线程,但执行上下文中蕴含大量信息,这会对性能造成肯定的影响。

ExecutionContext 管制执行上下文
static void Main(string[] args)
{
    // 将数据放入 Main 线程的逻辑调用上下文
    CallContext.LogicalSetData("Name", "DoubleJ");

    // 线程池线程能拜访逻辑调用上下文数据
    ThreadPool.QueueUserWorkItem(state => {Console.WriteLine("state = {0}, name = {1}", state, CallContext.LogicalGetData("Name"));
    }, 1);

    // 阻止 Main 线程的执行上下文流动
    ExecutionContext.SuppressFlow();

    // 线程池线程将无法访问逻辑调用上下文数据
    ThreadPool.QueueUserWorkItem(state => {Console.WriteLine("state = {0}, name = {1}", state, CallContext.LogicalGetData("Name"));
    }, 2);

    // 复原 Main 线程的执行上下文流动
    ExecutionContext.RestoreFlow();

    // 线程池线程又能够拜访逻辑调用上下文数据了
    ThreadPool.QueueUserWorkItem(state => {Console.WriteLine("state = {0}, name = {1}", state, CallContext.LogicalGetData("Name"));
    }, 3);

    Console.ReadKey();}
运行后果

工作

ThreadPool 的 QueueUserWorkItem 办法尽管非常简单,然而却没方法晓得操作在什么时候实现以及获取返回值。当初应用工作 (task) 能够补救这些有余。

期待工作实现并获取返回后果
static void Main()
{Task<int> t = new Task<int>(() =>
    {
        int sum = 0;
        for (int i = 0; i < 10000; i++)
            sum += i;
        return sum;
    });

    // 启动工作
    t.Start();

    // 期待工作实现
    t.Wait();

    // 查看返回后果
    Console.WriteLine("result = {0}", t.Result);
    Console.ReadKey();}

线程调用 Wait 办法时,零碎会检测线程要期待的 Task 是否曾经开始执行,如果是,调用 Wait 办法的线程会阻塞,直到 Task 运行完结为止。如果 Task 还没有开始执行,零碎可能会应用调用 Wait 办法的线程来执行 Task,这种状况调用 Wait 的线程将不会阻塞,它会执行 Task 并立刻返回。如果线程在调用 Wait 前曾经取得了一个线程同步锁,而 Task 试图获取同一个锁,就会造成线程死锁。

勾销工作
static void Main()
{var t = new Task<int>(() =>
    {
        int sum = 0;
        for (int i = 0; i < 10000; i++)
        {
            // 如果已勾销则会抛出异样
            cts.Token.ThrowIfCancellationRequested();
            sum += i;
        }
        return sum;
    }, cts.Token);

    t.Start();
    // 异步申请,Task 可能曾经实现了
    cts.Cancel();

    try
    {
        // 如果工作已勾销,Result 会引发 AggregateException
        Console.WriteLine("result = {0}", t.Result);
    }
    catch (AggregateException exp)
    {exp.Handle(e => e is OperationCanceledException);
        Console.WriteLine("工作已勾销");
    }
    Console.ReadKey();}

创立一个工作时能够将一个 CancellationToken 传给 Task 的结构器,从而将 CancellationToken 和该 Task 关联在一起。如果 CancellationToken 在 Task 调度前被勾销,则 Task 永远都不会再执行。

运行后果

一个工作实现时主动启动一个新的工作
private static int Sum(int n)
{
    n += 1;
    Console.WriteLine("n = {0}", n);
    return n;
}

static void Main()
{Task<int> t = new Task<int>(n => Sum((int)n), 0);
    t.Start();
    t.ContinueWith(task => Sum(task.Result));
    Console.ReadKey();}

上述代码执行完工作 (t) 时,会启动另一个工作,执行上述代码的线程不会进入阻塞状态并期待两个工作中的任意一个工作实现,线程能够继续执行其它代码。

运行后果

父工作和子工作
private static int Sum(int n)
{
    n += 1;
    Console.WriteLine("n = {0}", n);
    return n;
}

static void Main()
{Task<int[]> parent = new Task<int[]>(() =>
    {var result = new int[3];
        new Task(() => result[0] = Sum(0), TaskCreationOptions.AttachedToParent).Start();
        new Task(() => { Thread.Sleep(5000); result[1] = Sum(1); }, TaskCreationOptions.AttachedToParent).Start();
        new Task(() => result[2] = Sum(2), TaskCreationOptions.AttachedToParent).Start();
        return result;
    });
    parent.ContinueWith(parentTask => Array.ForEach(parentTask.Result, Console.WriteLine));
    parent.Start();
    Console.ReadKey();}
运行后果

当初改变一行代码,如下:

new Task(() => { Thread.Sleep(5000); result[1] = Sum(1); }, TaskCreationOptions.AttachedToParent).Start();

// 上段代码改为
new Task(() => { Thread.Sleep(5000); result[1] = Sum(1); }).Start();
运行后果

论断
默认状况一个工作创立的 Task 对象是顶级工作,这些工作与创立他们的那个工作没有关联,然而应用 TaskCreationOptions.AttachedToParent 标记将一个 Task 和创立它的那个 Task 关联起来,这样一来除非所有的子工作以及子工作的子工作完结运行,否则父工作就不会认为曾经完结。

工作工厂
private static int Sum(int n)
{
    int sum = 0;
    for (int i = 0; i < n; i++)
    {
        checked
        {sum += i;}
    }
    return sum;
}

static void Main()
{Task parent = new Task(() => {var cts = new CancellationTokenSource();
        var tf = new TaskFactory<int>(
            cts.Token,
            TaskCreationOptions.AttachedToParent,
            TaskContinuationOptions.ExecuteSynchronously,
            TaskScheduler.Default
        );

        // 创立并启动子工作
        var childTask = new[]
        {tf.StartNew(() => Sum(1000)),
            tf.StartNew(() => Sum(10000)),
            tf.StartNew(() => Sum(100000))
        };

        // 任何子工作抛出异样就勾销其余子工作
        for (int i = 0; i < childTask.Length; i++)
            childTask[i].ContinueWith(t => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted);

        // 实现所有子工作后
        tf.ContinueWhenAll(
            childTask,
            completedTask => completedTask.Where(t => !t.IsFaulted && !t.IsCanceled).Max(t => t.Result),
            CancellationToken.None
        ).ContinueWith(t => Console.WriteLine("max result is : {0}", t.Result
        ), TaskContinuationOptions.ExecuteSynchronously);
    });

    // 子工作实现后
    parent.ContinueWith(p =>
    {foreach (var e in p.Exception.Flatten().InnerExceptions)
            Console.WriteLine("Exception : {0}", e.Message);
    }, TaskContinuationOptions.OnlyOnFaulted);
    parent.Start();
    Console.ReadKey();}
运行后果

Parallel 类的应用
Parallel 的动态 For 办法
// 单线程执行(不倡议)
for (int i = 0; i < 100000; i++)
    DoSomthing(i);

// 并行工作(举荐)
Parallel.For(0, 100000, (i) => DoSomthing(i));
Parallel 的动态 ForEach 办法
var collection = new int[] { 1, 2, 3};
// 不倡议
foreach (var item in collection)
    DoSomthing(item);

// 举荐
Parallel.ForEach(collection, item => DoSomthing(item));

*如果既能够应用 For 也能够应用 ForEach,那么应用 For 办法的速度要比 ForEach 办法快一些

Parallel 的动态 Invoke 办法
// 一个线程程序执行多个办法
Method1();
Method2();
Method3();

// 线程池的线程并行执行办法
Parallel.Invoke(() => Method1(),
    () => Method2(),
    () => Method3()
);

应用 Parallel 的前提是代码必须能够是并行执行的,如果工作肯定要程序执行请勿应用。Parallel 的所有办法都让调用线程参加解决,如果调用线程在线程池线程实现本人的那一部分工作之前实现工作,那么调用线程便会主动挂起,直到所有工作实现后持续。

注意事项

Parallel 尽管好用,但也须要开销,其中必须调配委托对象,而每一个工作项,都要调用一次委托。如果有大量能够应用多线程解决的工作项,那么兴许能晋升性能。再或者每一个工作项都须要波及大量的工作,那么调用委托所造成的性能损失便能够忽略不计。然而如果工作项很少或者每一个工作项都能解决得十分快,这种状况下应用 Parallel 的办法反而会侵害性能。

For,ForEach,Invoke 的参数 ParallelOptions 对象
  • CancellationToken:容许勾销操作,默认为 CancellationToken.None
  • MaxDegreeOfParallelism:指定能够并发执行的最大工作项数量
  • TaskScheduler:指定要应用哪个 TaskScheduler,默认为 TaskScheduler.Default
定时执行工作

以下代码创立了一个计时器,并且立刻执行一次 SomeMethod 办法,之后每距离 1s 继续执行 SomeMethod 办法

private static  System.Threading.Timer s_Timer;

static void Main()
{s_Timer = new Timer(SomeMethod, 6, 0, Timeout.Infinite);
    Console.ReadKey();}

private static void SomeMethod(object state)
{Console.WriteLine("state = {0}", state);

    // 让 Timer 在 1s 后再调用这个办法
    s_Timer.Change(1000, Timeout.Infinite);
}

在外部,线程池为所有 Timer 对象只应用了一个线程。该线程晓得下一个 Timer 对象还须要多久触发,下一个 Timer 对象触发时线程就会唤醒,在外部调用 ThreadPool 的 QueueUserWorkItem,将一个工作项增加到线程池的申请队列中,使回调办法能够失去调用。如果回调办法执行工夫很长,计时器可能再次触发,这种状况可能造成多个线程池线程同时执行回调办法。解决这个问题能够在结构 Timer 对象时,将 period 参数指定为 Timeout.Infinite,这样计时器就只会触发一次。如果要循环执行计时器能够在回调办法外部调用 Change 办法,如上述代码那样。

运行后果

能够看到控制台立刻输入 state = 6 之后每隔 1s 在输入 state = 6

不举荐的做法
private static  System.Threading.Timer s_Timer;

static void Main()
{s_Timer = new Timer(SomeMethod, 6, 0, 1000);
    Console.ReadKey();}

private static void SomeMethod(object state)
{Console.WriteLine("state = {0}", state);
}

这种做法尽管与上一段代码后果统一,一旦回调办法执行工夫过长,超过 period 参数指定的调用回调办法的工夫距离,那么便可能呈现多个线程同时执行回调办法,这并不是想要的后果。

CLR 线程池
数据结构图

治理工作者线程

ThreadPool.QueueUserWorkItem 和 Timer 类总是将工作项放到全局队列中,工作者线程采纳先入先出的算法将工作项从队列中取出,并解决它们。因为多个工作者线程可能同时从全局队列中取出工作项,所以所有工作者线程都竞争同一个线程同步锁。

每个工作者线程都有一个本人的本地队列,当一个工作者线程调度一个 Task 时,Task 会增加到调用线程的本地队列。工作者线程筹备解决一个工作项时,它总是会先查看它的本地队列,如果存在一个 Task,工作者线程就从它的本地队列中移除 Task,并对工作项进行解决(工作者线程采纳的是后入先出的算法取出队列中的工作项)。因为每个工作者线程的本地队列只有本人能拜访,所以无需线程同步锁。

当工作者线程发现自己的本地队列为空时,工作者线程就会尝试从另一个工作者线程的本地队列的尾部取出一个工作项,并要求获取一个线程同步锁(对性能有些许影响)。如果所有本地队列都为空,那么工作者线程会应用先入先出算法尝试从全局队列取出工作项并获取线程同步锁,如果全局队列也为空,那么工作者线程将进入睡眠状态,期待事件的产生。如果睡眠工夫太长,将主动唤醒并销毁本身。

线程池会疾速创立工作者线程,使数量等于调用 ThreadPool.SetMinThreads 办法传递的值,如果没有调用该办法,默认等于过程容许应用的 CPU 数量。通常过程容许应用机器上的所有 CPU 数,所以线程池创立的工作者线程数量很快便会达到机器上的 CPU 数量。创立线程后,线程池会监督工作项的实现速度,如果工夫太长,线程池会创立更多的线程,如果工作项实现速度很快,工作者线程就会被销毁。

CPU 缓存行和伪共享
缓存行

为了晋升拜访内存的速度,CPU 在逻辑上将所有内存都划分为缓存行,缓存行是 2 的整数幂个间断字节,最常见的缓存行大小是64 个字节,所以 CPU 从 RAM 中获取并存储 64 个字节块。例如应用程序读取一个 Int32 的数据,那么会获取蕴含这个 Int32 值的 64 个字节。获取更多的字节通常能够加强性能,因为应用程序大多数在拜访一些数据之后持续拜访这些数据四周的其它数据。此时因为相邻的数据曾经在 CPU 的缓存中了,就防止了慢速度的 RAM 拜访。

然而,如果两个或多个内核拜访同一个缓存行中的字节,内核之间必须相互通信,并在不同的内核之间传递缓存行,造成多个内核不能同时解决相邻的字节,从而对性能造成重大的影响。

代码测试
private const int COUNT = 100000000;

private static int s_OperationCount = 2;
private static long s_StartTime;

class SomeType
{
    public int Field1;
    public int Field2;
}

private static void AccessField(SomeType type, int field)
{
    // 线程各自拜访 type 中的字段
    for (int i = 0; i < COUNT; i++)
    {if (field == 0)
            type.Field1++;
        else
            type.Field2++;
    }

    // 最初一个线程完结后显示破费工夫
    if(Interlocked.Decrement(ref s_OperationCount) == 0)
        Console.WriteLine("破费工夫:{0}", (Stopwatch.GetTimestamp() - s_StartTime) / (Stopwatch.Frequency / 1000));
}

static void Main()
{var type = new SomeType();
    s_StartTime = Stopwatch.GetTimestamp();

    // 两个线程拜访对象中的字段
    ThreadPool.QueueUserWorkItem(o => AccessField(type, 0));
    ThreadPool.QueueUserWorkItem(o => AccessField(type, 1));

    Console.ReadKey();}

上述代码的 type 对象蕴含两个字段 Field1 和 Field2,这两个字段极有可能在同一个缓存行中,接着启动两个线程执行 AccessField 办法,一个线程操作 Field1,另一个线程操作 Field2,每个线程实现时递加 s_OperationCount 的值,最初显示两个线程实现工作破费的总工夫。

运行后果

接着批改一下 SomeType 类,让它变成这样:

[StructLayout(LayoutKind.Explicit)]
class SomeType
{[FieldOffset(0)]
    public int Field1;

    [FieldOffset(64)]
    public int Field2;
}

批改后的 SomeType 类应用了缓存行分隔 Field1 字段和 Field2 字段,在第一个版本中这个两个字段属于同一个缓存行,造成不同的 CPU 必须不停的来回传递字节。尽管从程序的角度看,两个线程解决的是不同的数据,但从 CPU 的缓存行角度看,CPU 解决的是雷同的数据,称为 伪共享。在批改后的代码中,字段别离属于不同的缓存行,所以 CPU 能够做到独立工作,不用共享。

再次执行查看后果,速度显著晋升

拜访数组

因为数组在数组内存起始处保护着数组的长度信息,具体位置是在前几个元素之后。拜访一个元素时,CLR 会验证应用的索引是否在数组的长度范畴内。所以拜访一个数组的元素总是会牵扯到拜访数组的长度,因而为了防止产生额定的伪共享,应防止让一个线程拜访数组的前几个元素,同时让另一个线程拜访数组中的其它元素。

正文完
 0