乐趣区

C框架

thrift

在网络一节中简单介绍了 thrift 的协议部分,在工程中会用得到 thrift 的线程并发,process,server 库。定义 idl 后生成代码和业务编写代码的关系如下:

运行过程:

1. 开启 threaft 线程池
    主线程创建 n 个,开启,数量不够 workerMonitor_.wait。到 100 个就死了(加锁,结束释放)工作线程开启后,加锁,增加数量,workerMonitor_.notify,任务空 monitor_.wait(),否则取任务,判断等待队列长度不到阈值则 manager_->maxMonitor_.notify(),释放锁。执行任务。结束后继续抢锁循环
2. 开启 nonblockingserver,io 线程就绪
    非 0 号其他线程先 start,设置 eventbase(iothread),createpipe, 注册 notify;event_base_loop(eventBase_, 0);【无监听,每个 io 线程自己的 event_base】0 号线程注册事件,设置 eventbase(iothread);注册监听;createpipe, 注册 notify。0 号 io 线程 run,开始监听。其他 io 线程 join
3.0 号监听到 handleEvent
    accept
    加锁 create connection 分配连接给 io 线程(轮询)释放锁,通知分配的线程 notifyhandler
4. 分配到连接的 IO 线程 notifyhandler(read notifyfd,transition)
    本次 transition: 读取,调用 addtask=>setidle, 不需要监听 cfd
5.addtask
    thrift,加锁,如果 tasks_.size() >= pendingTaskCountMax_,maxMonitor_.wait(timeout);加入 task 队列,有空闲线程 monitor_.notify()。任何一种 monitor 都公用一个锁。这里的 task 就是 process 然后 notifyIOThread(read notifyfd,transition)。6. 处理后通知 IO 线程
    transition 将 cfd 改为监听写事件, 加入到本线程的事件监听中,调用 connenction 的回调发送。7.connenction 的回调发送之后继续 notifyIOThread 本次 transition 重置缓存区结束。


总结:多 reactor 多线程模式,一个 accept,多个读写,单独任务处理。正常只需要一个 reactor。单 reactor 多线程形式。

http_server

关于优雅重启

nginx 这种多进程的比价好做,因为子进程可以独立于父进程。
主进程 fork,继承监听 fd, 锁等,exec() 执行完整代码。此时旧的子进程和新的子进程都可以抢锁监听 fd 处理连接,关闭旧主进程,给旧的子进程发送关闭信号,子进程在处理后才会监听到信号,做到了优雅。
线程没办法独立监听信号。

连接池


add 的就是任意连接对象。实现 connect,reconnect.
比如

    for (int i = 0; i < connectionCount; ++i) {RedisClient* redis = new RedisClient(host, port, conn_timeout_ms, rw_timeout_ms);
        redis->init();//CONNECT
        redisPool_.add(redis);
    }

改造的 redis_pool

连接池 + 线程池 +hiredis 分别负责连接管理和并发请求处理。
封装目的:一般并发到分片获取数据的代理都有以下缺点:一个失败全部失败,要等所有返回才返回,而 mget 的失败会被放大。因此自己在业务层控制整个 mget 的超时时间和返回,到代理层已经拆分为当个 get,用线程池实现。

spdlog

  • 业务调用

    spdlog::set_async_mode(8192*4, spdlog::async_overflow_policy::block_retry,nullptr, std::chrono::seconds(3));
    std::string info_file =  FLAGS_log_path + "/" + FLAGS_info_file
    auto debug_logger = spdlog::basic_logger_mt("debug_logger", info_file.c_str());
    debug_logger->set_pattern("INFO: %Y-%m-%d %H:%M:%S %v");
    
    
    inline std::shared_ptr<spdlog::logger> spdlog::create(const std::string& logger_name, Args... args)
    {sink_ptr sink = std::make_shared<Sink>(args...);
        return details::registry::instance().create(logger_name, { sink});    
        /* 锁控制  new_logger = std::make_shared<async_logger>(logger_name, sinks_begin, sinks_end, _async_q_size, _overflow_policy, _worker_warmup_cb, _flush_interval_ms, _worker_teardown_cb);    // 这里启线程
        _loggers[logger_name] = new_logger;*/
    }
    
     auto logger = spdlog::get("warn_logger");\
               if (logger != NULL) { \
                   logger->info("{}:{} {}", cplusutils::servbase_basename(__FILE__), __LINE__, log_info.str()); \
               }
    
    info()=>log()->push_msg()
    
  • spdlog 的 push_msg 就是 enqueue

    inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg&& new_msg)
    {if (!_q.enqueue(std::move(new_msg)) && _overflow_policy != async_overflow_policy::discard_log_msg)
        {auto last_op_time = details::os::now();
            auto now = last_op_time;
            do
            {now = details::os::now();
                sleep_or_yield(now, last_op_time);
            }
            while (!_q.enqueue(std::move(new_msg)));
        }
    }
  • spdlog 每个日志都一个线程,启动后会循环等 dequeue 到落盘

    _worker_thread(&async_log_helper::worker_loop, this)
    while (active)
        {
            try
            {active = process_next_msg(last_pop, last_flush);
            }
        }
    
    
        inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush)
    {
        async_msg incoming_async_msg;
    
        if (_q.dequeue(incoming_async_msg))
        {for (auto &s : _sinks)
                {if (s->should_log(incoming_log_msg.level))
                    {s->log(incoming_log_msg);   // 调用正常的文件读写。}
                }
           
        }
        else
        {auto now = details::os::now();
            handle_flush_interval(now, last_flush);
            sleep_or_yield(now, last_pop);
            return !_terminate_requested;
        }
    }
    
  • 无锁队列

    bool enqueue(T&& data)
        {
            cell_t* cell;
            size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
            for (;;)
            {cell = &buffer_[pos & buffer_mask_];
                size_t seq = cell->sequence_.load(std::memory_order_acquire);
                intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
                if (dif == 0)
                {if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
                        break;
                }
                else if (dif < 0)
                {return false;}
                else
                {pos = enqueue_pos_.load(std::memory_order_relaxed);
                }
            }
            cell->data_ = std::move(data);
            cell->sequence_.store(pos + 1, std::memory_order_release);
            return true;
        }
    
        bool dequeue(T& data)
        {
            cell_t* cell;
            size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
            for (;;)
            {cell = &buffer_[pos & buffer_mask_];
                size_t seq =
                    cell->sequence_.load(std::memory_order_acquire);
                intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
                if (dif == 0)
                {if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
                        break;
                }
                else if (dif < 0)
                    return false;
                else
                    pos = dequeue_pos_.load(std::memory_order_relaxed);
            }
            data = std::move(cell->data_);
            cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
            return true;
        }
    

    buffer 数组。每个有 seq,data
    enqueue seq 前移 pos+1
    dequeue seq 前移 pos+1+mask 循环复用
    memory_order_relaxed: 不对执行顺序做保证
    memory_order_acquire: 本线程中, 所有后续的读操作必须在本条原子操作完成后执行 memory_order_release: 本线程中, 所有之前的写操作完成后才能执行本条原子操作 a.compare_exchange_weak(n,w): 比较 a 和 n, 如果相等,a 赋值为 w。不相等,n 赋值为 a, 返回 false

    buffer {sequence,data} enqueue_pos 两个和 cell 中的值一直加 1 dequeue_pos 同上

    为何一个 acquire 一个 relaxed 呢?pos 的 CAS 可以保证写的原子性。最低 relaxed。能保证单独操作原子,保证不了顺序, 这种对顺序的限制性能一定比锁好吗?这个只对单指令做限制,性能比锁好

退出移动版