关于数据库:Lets-Go-Rust-系列之定时器-Ticker-Timer

5次阅读

共计 18574 个字符,预计需要花费 47 分钟才能阅读完成。

前言

在理论我的项目开发中,常常会有定时工作的性能开发需要,定时工作次要分为两种,

  • 在固定的时刻执行某个工作,也就是 Timer
  • 基于固定的工夫距离,周期的执行某个工作,也就是 Ticker

很多基于工夫的调度工作框架都离不开这两种类型。

本文将会别离介绍在 Golang 和 Rust 语言中这两种定时器类型以及它们的应用办法。

Golang

Golang 的规范库 time 中就蕴含了 Ticker 和 Timer 这两种定时器类型,在 package 中增加援用就可, 如下:​

`import (

"time"

)`

Rust

本文中,对于 Rust 将会应用第三方 crate crossbeam-channel 提供的定时器类型,因为这个 crate 中的个性和 Golang 的个性是十分相似的,两者之间才具备可比性。

  • https://docs.rs/crossbeam/0.8…
  • https://docs.rs/crossbeam/0.8…

因为是第三方 create,所以在 Cargo.toml 中增加上面内容

crossbeam = "0.8"
crossbeam-channel = "0.5"

另外在代码中增加如下援用

use std::time::{Duration, Instant};
use crossbeam::select;
use crossbeam_channel::tick;
use crossbeam_channel::after;
use crossbeam_channel::unbounded;
use std::thread;

接下来,本文会基于不同的性能用例别离介绍在 Rust 和 Golang 中如何创立和应用 Ticker,并进行比照。

Ticker
首先介绍 Rust 和 Golang 中如何创立和应用 Ticker

Rust
在 Rust 的 crat crossbeam_channel 中应用 crossbeam_channel::tick 创立 Ticker

crossbeam_channel::tick 官网形容

/// Creates a receiver that delivers messages periodically.
///
/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
/// sent into the channel in intervals of `duration`. Each message is the instant at which it is
/// sent.

翻译过去就是:

返回一个 channel 的 receiver, 这个 channel 会周期性的传递进去音讯。
这个 channel 的容量是 1,永远不会敞开。
每隔固定工夫距离发送音讯到 channel 中,音讯的值就是音讯发送时刻的 instant。

看下 tick 的源码如下,能够看到 tick 返回的是一个 channel 的 Receiver

pub fn tick(duration: Duration) -> Receiver<Instant> {
    Receiver {flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
    }
}

再进入到 flavors::tick::Channel::new 中看到 flavors::tick::Channel 的定义和办法如下

pub(crate) struct Channel {
    /// The instant at which the next message will be delivered.
    delivery_time: AtomicCell<Instant>,

    /// The time interval in which messages get delivered.
    duration: Duration,
}

impl Channel {
    /// Creates a channel that delivers messages periodically.
    #[inline]
    pub(crate) fn new(dur: Duration) -> Self {
        Channel {delivery_time: AtomicCell::new(Instant::now() + dur),
            duration: dur,
        }
    }

    /// Attempts to receive a message without blocking.
    #[inline]
    pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
        loop {let now = Instant::now();
            let delivery_time = self.delivery_time.load();

            if now < delivery_time {return Err(TryRecvError::Empty);
            }

            if self
                .delivery_time
                .compare_exchange(delivery_time, now + self.duration)
                .is_ok()
            {return Ok(delivery_time);
            }
        }
    }

    /// Receives a message from the channel.
    #[inline]
    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
        loop {let delivery_time = self.delivery_time.load();
            let now = Instant::now();

            if let Some(d) = deadline {
                if d < delivery_time {
                    if now < d {thread::sleep(d - now);
                    }
                    return Err(RecvTimeoutError::Timeout);
                }
            }

            if self
                .delivery_time
                .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
                .is_ok()
            {
                if now < delivery_time {thread::sleep(delivery_time - now);
                }
                return Ok(delivery_time);
            }
        }
    }

    /// Reads a message from the channel.
    #[inline]
    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {token.tick.ok_or(())
    }

    /// Returns `true` if the channel is empty.
    #[inline]
    pub(crate) fn is_empty(&self) -> bool {Instant::now() < self.delivery_time.load()}

    /// Returns `true` if the channel is full.
    #[inline]
    pub(crate) fn is_full(&self) -> bool {!self.is_empty()
    }

    /// Returns the number of messages in the channel.
    #[inline]
    pub(crate) fn len(&self) -> usize {if self.is_empty() {0} else {1}
    }

    /// Returns the capacity of the channel.
    #[allow(clippy::unnecessary_wraps)] // This is intentional.
    #[inline]
    pub(crate) fn capacity(&self) -> Option<usize> {Some(1)
    }
}

留神下面 capacity 中返回的是 Some(1),验证了容量是 1 的说法。这个容量是 1 的个性比拟要害,在前面样例中会讲到。

疾速上手

fn simple_ticker() {let start = Instant::now();
    let ticker = tick(Duration::from_millis(100));

    for _ in 0..5 {let msg = ticker.recv().unwrap();
        println!("{:?} elapsed: {:?}",msg, start.elapsed());
    }

}

这个例子外面创立了一个距离是 100ms 的 ticker,每隔 100ms 就能够从 ticker 中获取一个 message,输入如下

Instant {tv_sec: 355149, tv_nsec: 271585400} elapsed: 100.0824ms
Instant {tv_sec: 355149, tv_nsec: 371585400} elapsed: 200.3341ms
Instant {tv_sec: 355149, tv_nsec: 471585400} elapsed: 300.3773ms
Instant {tv_sec: 355149, tv_nsec: 571585400} elapsed: 400.2563ms
Instant {tv_sec: 355149, tv_nsec: 671585400} elapsed: 500.379ms

ticker 所在线程休眠

crossbeam_channel::tick 所关联的 channel 的容量是 1,如果通道中曾经有了 message,那么后续新的 message 过去后就会被抛弃,咱们能够用上面样例验证

fn sleep_ticker(){let ms = |ms| Duration::from_millis(ms);
    // Returns `true` if `a` and `b` are very close `Instant`s.
    // 如果工夫 a 和 b 相差不到 50 milliseconds , 就返回 true
    let eq = |a,b| a+ms(50) > b &&  b+ms(50)>a;

    let start = Instant::now();

    // 定时器每隔 100 milliseconds 往 r 对应的 channel 中发送一个音讯
    let r = tick(ms(100));

    // This message was sent 100 ms from the start and received 100 ms from the start.
    // tick 开始 100 ms 后,收到音讯
    assert!(eq(r.recv().unwrap(), start + ms(100)));
    // 确实过来了 100 ms
    assert!(eq(Instant::now(), start + ms(100)));


    // 这里 tick 对应的 channel 外面是空, 所以在 [100ms 600ms] 线程休眠的工夫内,第 200ms 的音讯依然失常写入,300ms,400ms,500ms,600ms 的都无奈写进去
    thread::sleep(ms(500));

    // This message was sent 200 ms from the start and received 600 ms from the start.
    // tick 开始后的 200 ms,收到音讯
    // 这里到了第 600ms 时刻,channel 外面有 200ms 的音讯
    assert!(eq(r.recv().unwrap(), start + ms(200)));
    assert!(eq(Instant::now(), start + ms(600)));

    // 这里 tick 对应的 channel 又变为空了,所以 700ms 的音讯能够失常写入
    // This message was sent 700 ms from the start and received 700 ms from the start.
    // tick 开始后的 700ms,收到音讯
    assert!(eq(r.recv().unwrap(), start + ms(700)));
    assert!(eq(Instant::now(), start + ms(700)));

}

这个例子中,承受第一个 100ms 的音讯后,以后 thread 睡眠了 500ms,而后从 ticker 中承受了 Instant(200ms) 的音讯,下一次承受的是 Instant(700ms) 的音讯。这个后果咋一看感觉特地诡异,为什么承受了 Instant(200ms) 的音讯后,再次收到的就是 Instant(700ms) 的音讯呢? 起因在于 ticker 绑定的 channel 的容量只有 1,所以:

  1. 第 100ms 音讯达到时,channel 是空,往 channel 中填入胜利,而后 r.recv().unwap() 读取音讯时候又把 channel 清空了
  2. 以后 thread 进入休眠
  3. 在以后 thread 休眠的期间,所以 200ms 音讯达到时,channel 是空,往 channel 中填入胜利,此时 channel 满了
  4. 随后 300ms,400ms,500ms,600ms 的音讯达到时,channel 是满的,音讯被抛弃
  5. 600ms 后以后 thread 醒来,执行 r.recv().unwap() 读取音讯时候,此时 channel 中的是 200ms 的音讯,所以读出来是 Instant(200ms),并把 channel 清空了
  6. 700ms 的音讯达到时候,channel 是空,往 channel 中填入胜利,r.recv().unwap() 就读取到了 700ms 的音讯

参加 select

因为 tick 返回的是 Receiver,所以能够放入到 select 中跟失常 crossbeam_channel 创立的 channel 一起进行监听,如下

fn select_channl(){let start = Instant::now();
    let ticker = tick(Duration::from_millis(100));
    let (_s,r) = unbounded::<()>(); 
    // 这里如果写成  let (_,r) = unbounded::<()>();  那么 r 就会始终可读,读取的数据 Err(RecvError)

    for _ in 0..5 {

        select! {recv(r)->msg=>{println!("recve {:?}",msg);
            },
            recv(ticker)->msg=>{println!("elapsed: {:?} {:?}", msg.unwrap(),start.elapsed());
            },
        }
    }
}
elapsed: Instant {tv_sec: 179577, tv_nsec: 291446700} 100.4331ms
elapsed: Instant {tv_sec: 179577, tv_nsec: 391877000} 200.8107ms
elapsed: Instant {tv_sec: 179577, tv_nsec: 492246700} 301.2404ms
elapsed: Instant {tv_sec: 179577, tv_nsec: 592683200} 401.4015ms
elapsed: Instant {tv_sec: 179577, tv_nsec: 692843600} 501.5007ms

这里须要留神的是,下面 let (_s,r) = unbounded::<()>(); 如果写成 let (_,r) = unbounded::<()>();的话,select! 就会始终进入到 recv(r)->msg 分支中,读取的音讯是 Err(RecvError) ​

具体起因跟 disconnection 无关,let (_,r) = unbounded::<()>();相当于 Sender 在一开始就被 drop 掉了。

When all senders or all receivers associated with a channel get dropped, the channel becomes disconnected. No more messages can be sent, but any remaining messages can still be received. Send and receive operations on a disconnected channel never block.

翻译过去就是:

当一个 channel 所关联的所有 sender 或 所有的 receiver 都被 drop 掉之后,这个 channel 就会变成 disconnected. 不能够再往里面发送音讯,然而如果 channel 外面有残余的音讯能够持续承受。对一个 disconnected 的 channel 进行 Send 或 receive 操作都不会阻塞。

golang

在 golang 外面应用 time.NewTicker 来创立一个 Ticker 官网形容

NewTicker returns a new Ticker containing a channel that will send the time on the channel after each tick. The period of the ticks is specified by the duration argument. The ticker will adjust the time interval or drop ticks to make up for slow receivers. The duration d must be greater than zero; if not, NewTicker will panic. Stop the ticker to release associated resources.

这里提到 ticker 能够应用 Reset 重置距离,应用 Stop 来敞开 ticker,不过 Stop 并不会敞开 ticker 所关联的 channel。

Ticker 的外部源码实现

// A Ticker holds a channel that delivers ``ticks'' of a clock
// at intervals.
type Ticker struct {
    C <-chan Time // The channel on which the ticks are delivered.
    r runtimeTimer
}

// NewTicker returns a new Ticker containing a channel that will send
// the time on the channel after each tick. The period of the ticks is
// specified by the duration argument. The ticker will adjust the time
// interval or drop ticks to make up for slow receivers.
// The duration d must be greater than zero; if not, NewTicker will
// panic. Stop the ticker to release associated resources.
func NewTicker(d Duration) *Ticker {
    if d <= 0 {panic(errors.New("non-positive interval for NewTicker"))
    }
    // Give the channel a 1-element time buffer.
    // If the client falls behind while reading, we drop ticks
    // on the floor until the client catches up.
    c := make(chan Time, 1)
    t := &Ticker{
        C: c,
        r: runtimeTimer{when:   when(d),
            period: int64(d),
            f:      sendTime,
            arg:    c,
        },
    }
    startTimer(&t.r)
    return t
}

能够看到 golang 的 Ticker 也是关联了一个 channel,且这个 channel 的 buffer 长度也是 1.

疾速上手

func simple_ticker() {tick := time.NewTicker(time.Duration(100 * time.Millisecond))
    start := time.Now()
    for i := 0; i < 5; i++ {
        msg := <-tick.C
        fmt.Printf("%v elapsed: %v\n", msg, time.Since(start))
    }
}

执行后果如下:

2021-09-26 11:50:11.3452615 +0800 CST m=+0.100763101 elapsed: 100.807ms
2021-09-26 11:50:11.4447965 +0800 CST m=+0.200297901 elapsed: 200.2688ms
2021-09-26 11:50:11.5453194 +0800 CST m=+0.300820801 elapsed: 300.7901ms
2021-09-26 11:50:11.6448699 +0800 CST m=+0.400371301 elapsed: 400.3422ms
2021-09-26 11:50:11.7452991 +0800 CST m=+0.500800501 elapsed: 500.7743ms

ticker 所在线程休眠
golang 版本的 Ticker 和 crossbeam_channel::tick 相似,也是关联了一个 buffer 长度是 1 的 channel,所以执行的逻辑也和 crossbeam_channel::tick 的统一,这里就不多过解释。

func ms(d int) time.Duration {return time.Duration(d * int(time.Millisecond))
}
func eq(a, b time.Time) bool {return a.Add(ms(50)).After(b) && b.Add(ms(50)).After(a)
}

func assert(a, b time.Time) {if !eq(a, b) {panic(a)
    }
}

func sleep_ticker() {start := time.Now()

    // 定时器每隔 100 milliseconds 往 r 对应的 channel 中发送一个音讯
    r := time.NewTicker(ms(100))
    defer r.Stop()

    // This message was sent 100 ms from the start and received 100 ms from the start.
    // tick 开始 100 ms 后,收到音讯
    msg := <-r.C
    assert(msg, start.Add(ms(100)))
    // 确实过来了 100 ms
    assert(time.Now(), start.Add(ms(100)))

    // 这里 tick 对应的 channel 外面是空, 所以在 [100ms 600ms] 线程休眠的工夫内,第 200ms 的音讯依然失常写入,300ms,400ms,500ms,600ms 的都无奈写进去
    time.Sleep(ms(500))

    // This message was sent 200 ms from the start and received 600 ms from the start.
    // tick 开始后的 200 ms,收到音讯
    // 这里到了第 600ms 时刻,channel 外面有 200ms 的音讯
    msg = <-r.C
    assert(msg, start.Add(ms(200)))
    assert(time.Now(), start.Add(ms(600)))

    // 这里 tick 对应的 channel 又变为空了,所以 700ms 的音讯能够失常写入
    // This message was sent 700 ms from the start and received 700 ms from the start.
    // tick 开始后的 700ms,收到音讯
    msg = <-r.C
    assert(msg, start.Add(ms(700)))
    assert(time.Now(), start.Add(ms(700)))

}

参加 select
因为 Ticker 的成员 C 自身就是一个 channel

type Ticker struct {
    C <-chan Time // The channel on which the ticks are delivered.
    r runtimeTimer
}

所以能够把 Ticker.C 退出到 select 中,如下:

func select_ticker() {tick := time.NewTicker(time.Duration(100 * time.Millisecond))
    defer tick.Stop()
    start := time.Now()

    r := make(chan int)
    for i := 0; i < 5; i++ {
        select {
        case msg := <-tick.C:
            fmt.Printf("%v elapsed: %v\n", msg, time.Since(start))
        case <-r:
            fmt.Println("recv from r")
        }

    }
}

输入

2021-09-24 14:57:23.7813998 +0800 CST m=+0.100670501 elapsed: 100.6697ms
2021-09-24 14:57:23.8818368 +0800 CST m=+0.201107601 elapsed: 201.0681ms
2021-09-24 14:57:23.9814607 +0800 CST m=+0.300731501 elapsed: 300.7018ms
2021-09-24 14:57:24.0810694 +0800 CST m=+0.400340201 elapsed: 400.3102ms
2021-09-24 14:57:24.1815779 +0800 CST m=+0.500848601 elapsed: 500.8122ms

Timer

Rust

官网形容

Creates a receiver that delivers a message after a certain duration of time.
援用
The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will be sent into the channel after duration elapses. The message is the instant at which it is sent.

翻译过去就是

创立一个 channel 的 receiver, 通过特定工夫距离后这个 channel 中会塞入音讯
这个 channel 的容量是 1,永远不会敞开。每隔固定距离发送音讯到 channel 中,音讯的值就是代表音讯发送时刻的 Instant。

看下 after 的源码如下,同样能够看到 after 返回的也是是一个 channel 的 Receiver

pub fn after(duration: Duration) -> Receiver<Instant> {
    Receiver {flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))),
    }
}

进入到 flavors::at::Channel::new_timeout

/// Channel that delivers a message at a certain moment in time
pub(crate) struct Channel {
    /// The instant at which the message will be delivered.
    delivery_time: Instant,

    /// `true` if the message has been received.
    received: AtomicBool,
}

impl Channel {
    /// Creates a channel that delivers a message at a certain instant in time.
    #[inline]
    pub(crate) fn new_deadline(when: Instant) -> Self {
        Channel {
            delivery_time: when,
            received: AtomicBool::new(false),
        }
    }
    /// Creates a channel that delivers a message after a certain duration of time.
    #[inline]
    pub(crate) fn new_timeout(dur: Duration) -> Self {Self::new_deadline(Instant::now() + dur)
    }

    /// Attempts to receive a message without blocking.
    #[inline]
    pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
        // We use relaxed ordering because this is just an optional optimistic check.
        if self.received.load(Ordering::Relaxed) {
            // The message has already been received.
            return Err(TryRecvError::Empty);
        }

        if Instant::now() < self.delivery_time {
            // The message was not delivered yet.
            return Err(TryRecvError::Empty);
        }

        // Try receiving the message if it is still available.
        if !self.received.swap(true, Ordering::SeqCst) {
            // Success! Return delivery time as the message.
            Ok(self.delivery_time)
        } else {
            // The message was already received.
            Err(TryRecvError::Empty)
        }
    }

    /// Receives a message from the channel.
    #[inline]
    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
        // We use relaxed ordering because this is just an optional optimistic check.
        if self.received.load(Ordering::Relaxed) {
            // The message has already been received.
            utils::sleep_until(deadline);
            return Err(RecvTimeoutError::Timeout);
        }

        // Wait until the message is received or the deadline is reached.
        loop {let now = Instant::now();

            let deadline = match deadline {
                // Check if we can receive the next message.
                _ if now >= self.delivery_time => break,
                // Check if the timeout deadline has been reached.
                Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),

                // Sleep until one of the above happens
                Some(d) if d < self.delivery_time => d,
                _ => self.delivery_time,
            };

            thread::sleep(deadline - now);
        }

        // Try receiving the message if it is still available.
        if !self.received.swap(true, Ordering::SeqCst) {
            // Success! Return the message, which is the instant at which it was delivered.
            Ok(self.delivery_time)
        } else {
            // The message was already received. Block forever.
            utils::sleep_until(None);
            unreachable!()}
    }

    /// Reads a message from the channel.
    #[inline]
    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {token.at.ok_or(())
    }

    /// Returns `true` if the channel is empty.
    #[inline]
    pub(crate) fn is_empty(&self) -> bool {
        // We use relaxed ordering because this is just an optional optimistic check.
        if self.received.load(Ordering::Relaxed) {return true;}

        // If the delivery time hasn't been reached yet, the channel is empty.
        if Instant::now() < self.delivery_time {return true;}

        // The delivery time has been reached. The channel is empty only if the message has already
        // been received.
        self.received.load(Ordering::SeqCst)
    }

    /// Returns `true` if the channel is full.
    #[inline]
    pub(crate) fn is_full(&self) -> bool {!self.is_empty()
    }

    /// Returns the number of messages in the channel.
    #[inline]
    pub(crate) fn len(&self) -> usize {if self.is_empty() {0} else {1}
    }

    /// Returns the capacity of the channel.
    #[allow(clippy::unnecessary_wraps)] // This is intentional.
    #[inline]
    pub(crate) fn capacity(&self) -> Option<usize> {Some(1)
    }
}

和 tick 一样,办法 capacity 中返回的是 Some(1),表明 after 返回的 Receiver 所关联的 chnnel 的容量确实也是 1。

疾速上手

fn simple_after() {let start = Instant::now();
    let af = after(Duration::from_millis(100));

    for _ in 0..5 {af.recv().unwrap();
        println!("elapsed: {:?}", start.elapsed());
    }

}

下面样例中,在 100ms 的时刻会往 af 关联 channel 写入音讯,之后就不会再有新的音讯达到 af 关联 channel,所以 af.recv() 在收到 100ms 的音讯后,前面就会永远阻塞,输入如下:

`elapsed: 100.1125ms
^C`

after 所在线程休眠

fn sleep_after(){
    // Converts a number of milliseconds into a `Duration`.
    let ms = |ms| Duration::from_millis(ms);

    // Returns `true` if `a` and `b` are very close `Instant`s.
    let eq = |a, b| a + ms(50) > b && b + ms(50) > a;

    let start = Instant::now();
    let r = after(ms(100));

    thread::sleep(ms(500));

    // This message was sent 100 ms from the start and received 500 ms from the start.
    assert!(eq(r.recv().unwrap(), start + ms(100)));
    assert!(eq(Instant::now(), start + ms(500)));
}

因为以后 thread 休眠的时候,r 关联的 channel 是空的,所以当 100ms 的音讯到来的时候能够胜利写入到 r 关联的 channel 中,而后等以后 thread 醒来后,执行 r.recv().unwrap() 就能够拿到 100ms 的音讯。​

参加 select

与 tick 同理,after 也能够参加到 select 中

fn select_after() {let start = Instant::now();
    let (_s, r) = unbounded::<i32>();
    let timeout = Duration::from_millis(100);

    select! {recv(r) -> msg => println!("received {:?}", msg),
        recv(after(timeout)) -> msg => println!("timed out {:?} {:?}",msg.unwrap(),start.elapsed()),
    }

}

输入

timed out Instant {tv_sec: 181366, tv_nsec: 193851700} 100.1291ms

at

at 和 after 比拟相似,at 是指定具体的工夫点,官网形容如下:

Creates a receiver that delivers a message at a certain instant in time.
援用
The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will be sent into the channel at the moment in time when. The message is the instant at which it is sent, which is the same as when. If when is in the past, the message will be delivered instantly to the receiver.

须要留神的是,如果指定的工夫点比以后工夫还要早,那么会立即发送音讯到 channel ​

golang

应用 Timer 官网形容

// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.

构造体定义

type Timer struct {
    C <-chan Time
    // contains filtered or unexported fields
}

// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {c := make(chan Time, 1)
    t := &Timer{
        C: c,
        r: runtimeTimer{when: when(d),
            f:    sendTime,
            arg:  c,
        },
    }
    startTimer(&t.r)
    return t
}

也是关联一个缓存长度是 1 的 channel。

疾速上手

func simple_timer() {tm := time.NewTimer(time.Duration(100 * time.Millisecond))
    defer tm.Stop()
    start := time.Now()
    // 承受一次后就永远阻塞了,程序报死锁谬误
    for i := 0; i < 2; i++ {
        msg := <-tm.C
        fmt.Printf("%v elapsed: %v\n", msg, time.Since(start))
    }
}

下面样例中,在 100ms 的时刻会往 tm.C 写入音讯,之后就不会再有新的音讯达到 tm.C,所以 msg:=<-tm.C 在收到 100ms 的音讯后,前面就会永远阻塞,输入如下:

2021-09-24 15:08:56.8384098 +0800 CST m=+0.100381801 elapsed: 100.3695ms
fatal error: all goroutines are asleep - deadlock!

所在线程休眠

func sleep_timer() {start := time.Now()

    // 100 milliseconds 后 往 r 对应的 channel 中发送一个音讯
    r := time.NewTimer(ms(100))
    defer r.Stop()

    // 在休眠过程中的 100ms 的时候,往 r.C 中写入了 100ms 的
    time.Sleep(ms(500))

    // 此时外面的是 100ms 的音讯
    msg := <-r.C
    assert(msg, start.Add(ms(100)))
    // 确实过来了 500 ms
    assert(time.Now(), start.Add(ms(500)))

    // 到这里就永远阻塞了
    msg = <-r.C
    assert(msg, start.Add(ms(200)))
    assert(time.Now(), start.Add(ms(600)))

}

以后 thread 休眠的时候,r.C 是空的,所以当 100ms 的音讯到来的时候能够胜利写入到 r.C 中,而后等以后 thread 醒来后,执行 msg := <-r.C 就能够拿到 100ms 的音讯在过程输入。前面因为不会再有新的音讯写入到 r.C 中,执行 msg := <-r.C 就永远阻塞了。

fatal error: all goroutines are asleep - deadlock!

参加 select
与 time.Ticker 同理,能够把 Timer.C 退出到 select 中. 上面代码中因为 tm.C 只会收到一次音讯,所以 for 循环中在第二次时就永远阻塞了。

func select_timer() {tm := time.NewTimer(time.Duration(100 * time.Millisecond))
    defer tm.Stop()
    start := time.Now()

    r := make(chan int)

    for i := 0; i < 2; i++ {
        select {
        case msg := <-tm.C:
            fmt.Printf("%v elapsed: %v\n", msg, time.Since(start))
        case <-r:
            fmt.Println("recv from r")
        }

    }
}

输入

2021-09-24 15:13:09.9339061 +0800 CST m=+0.100970401 elapsed: 100.9088ms
fatal error: all goroutines are asleep - deadlock!

Instant

instant 在 Rust 和 golang 中用来度量工夫,它们的区别取下:

Rust
目前有两种办法能够获取以后工夫 Instant::now 和 SystemTime::now.

  • Instant: 返回的是 monotonic clock,次要用于在计算工夫距离时候有用。
  • SystemTime: 返回的是 wall clock,次要用于跟文件系统或其它过程之间进行沟通应用。

在 Rust 中,这两个类型的打印后果对人类来说都是不直观的,也没有提供响应的格式化打印的办法。

golang
Time

time.Now() 返回的 Time 类型蕴含 monotonic clock。

  1. 如果 Time t 蕴含 monotonic clock,那么 t.Add 会同时把 duration 加到 wall clock 和 monotonic clock 上。
  2. 因为 t.AddDate(y,m,d) ,t.Round(d) ,t.Truncate(d) 是 wall clock 的计算,所以这些办法计算后的后果会去掉 monotonic clock.
  3. 如果 Time t ,Time u 都蕴含 monotonic clock,那么 t.After(u), t.Before(u), t.Equal(u), and t.Sub(u) 在计算的时候只会应用 monotonic clock. 否则如果 u 或 t 任意一个不蕴含 monotonic clock,就应用 wall clock 来计算。
  4. t.GobEncode, t.MarshalBinary, t.MarshalJSON, and t.MarshalText 会疏忽掉 monotonic clock。
  5. == 在计算的时候,不仅仅会比拟 instant, 还会比拟 Location 和 monotonic clock.
  6. 程序在应用 Time 的时候应该传值而不是指针。也就是说 time 变量或构造体成员应该是类型 time.Time 而不是 *time.Time
  7. Time 值类型的变量能够被多个 goroutine 平安的并发拜访,除了 GobDecode, UnmarshalBinary, UnmarshalJSON and UnmarshalTex
  8. Time instants 能够应用 Before, After, 和 Equal 办法进行比拟。
  9. Each Time has associated with it a Location, consulted when computing the presentation form of the time, such as in the Format, Hour, and Year methods. The methods Local, UTC, and In return a Time with a specific location. Changing the location in this way changes only the presentation; it does not change the instant in time being denoted and therefore does not affect the computations described in earlier paragraphs.

正文完
 0