使用boost::thread_group和同步队列模拟Golang goroutine和channel

FredricZhu · · 628 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

boost::thread_group前两天讲过了,比较简单,其实就是一个boost::list<boost::shared_ptr<boost::thread>> threads类型对象的一个包装,封装一组线程列表。
同步队列的实现也是比较简单的使用了C++11提供的std::mutex和std::unique_lock对象,如果要兼容C++11之前的版本,可以将相关对象的名称空间换成 boost就可以。
下面上代码,
工程结构如下,


图片.png

utils/sync_queue.hpp

#ifndef _SYNC_QUEUE_HPP_
#define _SYNC_QUEUE_HPP_

#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>

using namespace std;

template <typename T> 
class SyncQueue {
    public:
        
        SyncQueue(int maxSize): m_maxSize(maxSize), m_needStop(false) {}

        void Put(const T& x) {
            Add(x);
        }

        void Put(T&& x) {
            Add(std::forward<T>(x));
        }
        
        void Take(std::list<T>& list) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotEmpty.wait(locker, [this] {
                        return m_needStop || NotEmpty();
                    });

            if(m_needStop) {
                return;
            }

            list = std::move(m_queue);
            m_NotFull.notify_one();
        }


        void Take(T& t) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotEmpty.wait(locker, [this] {
                        return m_needStop || NotEmpty();
                    });
            
            if(m_needStop) {
                return;
            }

            t = m_queue.front();
            m_queue.pop_front();
            m_NotFull.notify_one();
        }

        void Stop() {
            {
                std::lock_guard<std::mutex> locker(m_mutex);
                m_needStop = true;
            }

            m_NotFull.notify_all();
            m_NotEmpty.notify_all();
        }
        
        bool Empty() {
            std::lock_guard<std::mutex> locker;
            return m_queue.empty();
        }

        bool Full() {
            std::lock_guard<std::mutex> locker;
            return m_queue.size() == m_maxSize;
        }
    
    private:
        
        // 判断队列未满,内部使用的无锁版,否则会发生死锁
        bool NotFull() const {
            bool full = m_queue.size() >= m_maxSize;
            return !full;
        }

        bool NotEmpty() const {
            bool empty = m_queue.empty();
            return !empty;
        }

        template <typename F> 
        void Add(F&& x) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotFull.wait(locker, [this]{ return m_needStop || NotFull(); });
            
            if(m_needStop) {
                return;
            }

            m_queue.push_back(std::forward<F>(x));
            m_NotEmpty.notify_one();
        }

    private:
        std::list<T> m_queue; // 缓冲区
        std::mutex m_mutex; // 互斥量和条件变量结合起来使用
        std::condition_variable m_NotEmpty; // 不为空的条件变量
        std::condition_variable m_NotFull; // 不为满的条件变量
        int m_maxSize; // 同步队列的最大大小
        bool m_needStop; // 停止标志
};
#endif

send_data/consts.h

#ifndef _SEND_DATA_CONSTS_H_
#define _SEND_DATA_CONSTS_H_

namespace send_data {
    static const int DATA_SIZE = 3000;
    static const int THREAD_SIZE = 3;
};
#endif

send_data/package_data.h

#ifndef _PACKAGE_DATA_H_
#define _PACKAGE_DATA_H_
#include "../utils/sync_queue.hpp"
#include <boost/atomic.hpp>

#include <vector>
#include <algorithm>
namespace send_data
{

    class PackageData
    {
    private:
        std::vector<int> input_;
        SyncQueue<int> &data_queue_;

    public:
        PackageData(std::vector<int> input, SyncQueue<int> &data_queue) : input_(input), data_queue_(data_queue)
        {
        }

        void package_data() const
        {
            for (int i = 0; i < input_.size(); ++i)
            {
                data_queue_.Put(input_[i]);
            }
        }
    };
};

#endif

send_data/send_data.h

#ifndef _SEND_DATA_H_
#define _SEND_DATA_H_
#include "consts.h"
#include "../utils/sync_queue.hpp"
namespace send_data
{
    class SendData
    {
    private:

        SyncQueue<int> &data_queue_;

    public:
        SendData(SyncQueue<int> &data_queue) : data_queue_(data_queue) {}

        void send_data()
        {
            int ele;
            for (int i = 0; i < DATA_SIZE; ++i)
            {
                data_queue_.Take(ele);
                std::cerr << "data: " << ele << " is been sent!" << std::endl;
            }
        }
    };
};

#endif

main.cpp

#include "send_data/package_data.h"
#include "send_data/send_data.h"
#include "utils/sync_queue.hpp"

#include <boost/thread.hpp>

using namespace send_data;

int main(int argc, char* argv[]) {
    boost::thread_group package_data_g;
    boost::thread_group send_data_g;

    SyncQueue<int> d_queue(THREAD_SIZE * DATA_SIZE);

    // std::vector<int> a{1,2,3};
    //   // 1 + 0 *3
    //   // 1 + 1 *3 
    //   // 1 + 2
    // std::vector<int> b{4,5, 6};
    // std::vector<int> c{7,8,9};
    
    for(int i=0; i<THREAD_SIZE; ++i) {
        std::vector<int> a;
        for(int j=0; j<DATA_SIZE; ++j) {
            a.emplace_back(1 + i*DATA_SIZE +j);
        }

        PackageData pkg_data(a, d_queue);
        package_data_g.create_thread([pkg_data]() {
            pkg_data.package_data();
        });
    }
   
    SendData sdata(d_queue);
    for(int i=0; i<THREAD_SIZE; ++i) {    
        send_data_g.create_thread([&sdata]() {
            sdata.send_data();
        });
    }

    package_data_g.join_all();
    send_data_g.join_all();
}

程序会将 1-9000拆成三个批次放入 sync_queue,然后从sync_queue中把1-9000全部读出来,如下图所示,


图片.png

这种方式存在一些限制,比如 打包的线程和发送的线程 个数必须一样,不然会造成条件变量的死等。
所以生产实践中不建议使用,可以使用下一节讲述的boost::asio::io_service对象来处理,效果更好,限制更少。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:FredricZhu

查看原文:使用boost::thread_group和同步队列模拟Golang goroutine和channel

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

628 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传