使用boost::asio::io_service和同步队列模拟Golang goroutine和channel

FredricZhu · · 446 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

boost::asio::io_service有一个post方法,可以提交任务,异步执行。
C++11之后支持lambda表达式,可以捕获局部参数,支持值捕获和引用捕获。
本例主要基于这两点来模拟Goroutine和channel。
程序目录结构如下,


图片.png

程序代码如下,
utils/sync_queue.hpp

#ifndef _SYNC_QUEUE_HPP_
#define _SYNC_QUEUE_HPP_

#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>

using namespace std;

template <typename T> 
class SyncQueue {
    public:
        
        SyncQueue(int maxSize): m_maxSize(maxSize), m_needStop(false) {}

        void Put(const T& x) {
            Add(x);
        }

        void Put(T&& x) {
            Add(std::forward<T>(x));
        }
        
        void Take(std::list<T>& list) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotEmpty.wait(locker, [this] {
                        return m_needStop || NotEmpty();
                    });

            if(m_needStop) {
                return;
            }

            list = std::move(m_queue);
            m_NotFull.notify_one();
        }


        void Take(T& t) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotEmpty.wait(locker, [this] {
                        return m_needStop || NotEmpty();
                    });
            
            if(m_needStop) {
                return;
            }

            t = m_queue.front();
            m_queue.pop_front();
            m_NotFull.notify_one();
        }

        void Stop() {
            {
                std::lock_guard<std::mutex> locker(m_mutex);
                m_needStop = true;
            }

            m_NotFull.notify_all();
            m_NotEmpty.notify_all();
        }
        
        bool Empty() {
            std::lock_guard<std::mutex> locker;
            return m_queue.empty();
        }

        bool Full() {
            std::lock_guard<std::mutex> locker;
            return m_queue.size() == m_maxSize;
        }
    
    private:
        
        // 判断队列未满,内部使用的无锁版,否则会发生死锁
        bool NotFull() const {
            bool full = m_queue.size() >= m_maxSize;
            return !full;
        }

        bool NotEmpty() const {
            bool empty = m_queue.empty();
            return !empty;
        }

        template <typename F> 
        void Add(F&& x) {
            std::unique_lock<std::mutex> locker(m_mutex);
            m_NotFull.wait(locker, [this]{ return m_needStop || NotFull(); });
            
            if(m_needStop) {
                return;
            }

            m_queue.push_back(std::forward<F>(x));
            m_NotEmpty.notify_one();
        }

    private:
        std::list<T> m_queue; // 缓冲区
        std::mutex m_mutex; // 互斥量和条件变量结合起来使用
        std::condition_variable m_NotEmpty; // 不为空的条件变量
        std::condition_variable m_NotFull; // 不为满的条件变量
        int m_maxSize; // 同步队列的最大大小
        bool m_needStop; // 停止标志
};
#endif

utils/task_processor_base.hpp

#ifndef BOOK_CHAPTER6_TASK_PROCESSOR_BASE_HPP
#define BOOK_CHAPTER6_TASK_PROCESSOR_BASE_HPP

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)

#include <boost/thread/thread.hpp>
#include <boost/noncopyable.hpp>
#include <boost/asio/io_service.hpp>

#include <iostream>

namespace detail {

template <class T>
struct task_wrapped {
private:
    T task_unwrapped_;

public:
    explicit task_wrapped(const T& task_unwrapped)
        : task_unwrapped_(task_unwrapped)
    {}

    void operator()() const {
        // resetting interruption
        try {
            boost::this_thread::interruption_point();
        } catch(const boost::thread_interrupted&){}

        try {
            // Executing task
            task_unwrapped_();
        } catch (const std::exception& e) {
            std::cerr<< "Exception: " << e.what() << '\n';
        } catch (const boost::thread_interrupted&) {
            std::cerr<< "Thread interrupted\n";
        } catch (...) {
            std::cerr<< "Unknown exception\n";
        }
    }
};

template <class T>
inline task_wrapped<T> make_task_wrapped(const T& task_unwrapped) {
    return task_wrapped<T>(task_unwrapped);
}

} // namespace detail


namespace tp_base {

class tasks_processor: private boost::noncopyable {
protected:
    boost::asio::io_service         ios_;
    
    tasks_processor()
        : ios_()
    {}

public:
    static tasks_processor& get();

    template <class T>
    inline void push_task(const T& task_unwrapped) {
        ios_.post(detail::make_task_wrapped(task_unwrapped));
    }

    void start() {
        ios_.run();
    }

    void stop() {
        ios_.stop();
    }
}; // tasks_processor

} // namespace base::

#endif // BOOK_CHAPTER6_TASK_PROCESSOR_BASE_HPP

utils/task_processor_multithread.hpp

#ifndef BOOK_CHAPTER6_TASK_PROCESSOR_MULTITHREAD_HPP
#define BOOK_CHAPTER6_TASK_PROCESSOR_MULTITHREAD_HPP

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
#pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)

#include "tasks_processor_base.hpp"

namespace tp_multithread
{

    class tasks_processor : public tp_base::tasks_processor
    {
    public:
        static tasks_processor &get();

        // Default value will attempt to guess optimal count of threads
        void start_multiple(std::size_t threads_count = 0)
        {
            if (!threads_count)
            {
                threads_count = (std::max)(static_cast<int>(
                                               boost::thread::hardware_concurrency()),
                                           1);
            }

            // one thread is the current thread
            --threads_count;

            boost::thread_group tg;
            for (std::size_t i = 0; i < threads_count; ++i)
            {
                tg.create_thread(boost::bind(&boost::asio::io_service::run, boost::ref(ios_)));
            }

            ios_.run();
            tg.join_all();
        }
    };

    tasks_processor &tasks_processor::get()
    {
        static tasks_processor proc;
        return proc;
    }
} // namespace tp_multithread

#endif // BOOK_CHAPTER6_TASK_PROCESSOR_MULTITHREAD_HPP

send_data/consts.h

#ifndef _SEND_DATA_CONSTS_H_
#define _SEND_DATA_CONSTS_H_

static const int DATA_SIZE = 3000;
static const int TASK_SIZE = 3;

#endif

send_data/package_data.h

#ifndef _PACKAGE_DATA_H_
#define _PACKAGE_DATA_H_
#include "../utils/sync_queue.hpp"
#include <boost/atomic.hpp>

#include <vector>
#include <algorithm>
#include <iostream>

class PackageData
{
private:
    std::vector<int> input_;
    SyncQueue<int> &data_queue_;

public:
    PackageData(std::vector<int> input, SyncQueue<int> &data_queue) : input_(input), data_queue_(data_queue)
    {
    }

    void package_data() const
    {
        for (int i = 0; i < input_.size(); ++i)
        {
            data_queue_.Put(input_[i]);
        }
    }
};

#endif

send_data/send_data.h

#ifndef _SEND_DATA_H_
#define _SEND_DATA_H_

#include "consts.h"
#include "../utils/sync_queue.hpp"

class SendData
{
private:
    SyncQueue<int> &data_queue_;

public:
    SendData(SyncQueue<int> &data_queue) : data_queue_(data_queue) {}

    void send_data()
    {
        int ele;
        for (int i = 0; i < DATA_SIZE; ++i)
        {
            data_queue_.Take(ele);
            std::cerr << "data: " << ele << " is been sent!" << std::endl;
        }
    }
};

#endif

main.cpp

#include "utils/tasks_processor_multithread.hpp"
#include "utils/sync_queue.hpp"
#include "send_data/consts.h"
#include "send_data/send_data.h"
#include "send_data/package_data.h"

#include <boost/lexical_cast.hpp>

using namespace tp_multithread;

int main(int argc, char *argv[])
{

    SyncQueue<int> d_queue(TASK_SIZE * DATA_SIZE);

    for (std::size_t i = 0; i < TASK_SIZE; ++i)
    {
        std::vector<int> a;
        for (int j = 0; j < DATA_SIZE; ++j)
        {
            int ele = 1 + i * DATA_SIZE + j;
            a.emplace_back(ele);
        }

        tasks_processor::get().push_task([a, &d_queue]() {
            PackageData pack_data(a, d_queue);
            pack_data.package_data();
        });

        tasks_processor::get().push_task([&d_queue]() {
            SendData send_data(d_queue);
            send_data.send_data();
        });
    }

    tasks_processor::get().start_multiple();
}

程序会将 1-9000拆成三个批次放入 sync_queue,然后从sync_queue中把1-9000全部读出来,如下图所示


图片.png

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:FredricZhu

查看原文:使用boost::asio::io_service和同步队列模拟Golang goroutine和channel

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

446 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传