boost::asio::io_service有一个post方法,可以提交任务,异步执行。
C++11之后支持lambda表达式,可以捕获局部参数,支持值捕获和引用捕获。
本例主要基于这两点来模拟Goroutine和channel。
程序目录结构如下,
程序代码如下,
utils/sync_queue.hpp
#ifndef _SYNC_QUEUE_HPP_
#define _SYNC_QUEUE_HPP_
#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>
using namespace std;
template <typename T>
class SyncQueue {
public:
SyncQueue(int maxSize): m_maxSize(maxSize), m_needStop(false) {}
void Put(const T& x) {
Add(x);
}
void Put(T&& x) {
Add(std::forward<T>(x));
}
void Take(std::list<T>& list) {
std::unique_lock<std::mutex> locker(m_mutex);
m_NotEmpty.wait(locker, [this] {
return m_needStop || NotEmpty();
});
if(m_needStop) {
return;
}
list = std::move(m_queue);
m_NotFull.notify_one();
}
void Take(T& t) {
std::unique_lock<std::mutex> locker(m_mutex);
m_NotEmpty.wait(locker, [this] {
return m_needStop || NotEmpty();
});
if(m_needStop) {
return;
}
t = m_queue.front();
m_queue.pop_front();
m_NotFull.notify_one();
}
void Stop() {
{
std::lock_guard<std::mutex> locker(m_mutex);
m_needStop = true;
}
m_NotFull.notify_all();
m_NotEmpty.notify_all();
}
bool Empty() {
std::lock_guard<std::mutex> locker;
return m_queue.empty();
}
bool Full() {
std::lock_guard<std::mutex> locker;
return m_queue.size() == m_maxSize;
}
private:
// 判断队列未满,内部使用的无锁版,否则会发生死锁
bool NotFull() const {
bool full = m_queue.size() >= m_maxSize;
return !full;
}
bool NotEmpty() const {
bool empty = m_queue.empty();
return !empty;
}
template <typename F>
void Add(F&& x) {
std::unique_lock<std::mutex> locker(m_mutex);
m_NotFull.wait(locker, [this]{ return m_needStop || NotFull(); });
if(m_needStop) {
return;
}
m_queue.push_back(std::forward<F>(x));
m_NotEmpty.notify_one();
}
private:
std::list<T> m_queue; // 缓冲区
std::mutex m_mutex; // 互斥量和条件变量结合起来使用
std::condition_variable m_NotEmpty; // 不为空的条件变量
std::condition_variable m_NotFull; // 不为满的条件变量
int m_maxSize; // 同步队列的最大大小
bool m_needStop; // 停止标志
};
#endif
utils/task_processor_base.hpp
#ifndef BOOK_CHAPTER6_TASK_PROCESSOR_BASE_HPP
#define BOOK_CHAPTER6_TASK_PROCESSOR_BASE_HPP
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include <boost/thread/thread.hpp>
#include <boost/noncopyable.hpp>
#include <boost/asio/io_service.hpp>
#include <iostream>
namespace detail {
template <class T>
struct task_wrapped {
private:
T task_unwrapped_;
public:
explicit task_wrapped(const T& task_unwrapped)
: task_unwrapped_(task_unwrapped)
{}
void operator()() const {
// resetting interruption
try {
boost::this_thread::interruption_point();
} catch(const boost::thread_interrupted&){}
try {
// Executing task
task_unwrapped_();
} catch (const std::exception& e) {
std::cerr<< "Exception: " << e.what() << '\n';
} catch (const boost::thread_interrupted&) {
std::cerr<< "Thread interrupted\n";
} catch (...) {
std::cerr<< "Unknown exception\n";
}
}
};
template <class T>
inline task_wrapped<T> make_task_wrapped(const T& task_unwrapped) {
return task_wrapped<T>(task_unwrapped);
}
} // namespace detail
namespace tp_base {
class tasks_processor: private boost::noncopyable {
protected:
boost::asio::io_service ios_;
tasks_processor()
: ios_()
{}
public:
static tasks_processor& get();
template <class T>
inline void push_task(const T& task_unwrapped) {
ios_.post(detail::make_task_wrapped(task_unwrapped));
}
void start() {
ios_.run();
}
void stop() {
ios_.stop();
}
}; // tasks_processor
} // namespace base::
#endif // BOOK_CHAPTER6_TASK_PROCESSOR_BASE_HPP
utils/task_processor_multithread.hpp
#ifndef BOOK_CHAPTER6_TASK_PROCESSOR_MULTITHREAD_HPP
#define BOOK_CHAPTER6_TASK_PROCESSOR_MULTITHREAD_HPP
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
#pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include "tasks_processor_base.hpp"
namespace tp_multithread
{
class tasks_processor : public tp_base::tasks_processor
{
public:
static tasks_processor &get();
// Default value will attempt to guess optimal count of threads
void start_multiple(std::size_t threads_count = 0)
{
if (!threads_count)
{
threads_count = (std::max)(static_cast<int>(
boost::thread::hardware_concurrency()),
1);
}
// one thread is the current thread
--threads_count;
boost::thread_group tg;
for (std::size_t i = 0; i < threads_count; ++i)
{
tg.create_thread(boost::bind(&boost::asio::io_service::run, boost::ref(ios_)));
}
ios_.run();
tg.join_all();
}
};
tasks_processor &tasks_processor::get()
{
static tasks_processor proc;
return proc;
}
} // namespace tp_multithread
#endif // BOOK_CHAPTER6_TASK_PROCESSOR_MULTITHREAD_HPP
send_data/consts.h
#ifndef _SEND_DATA_CONSTS_H_
#define _SEND_DATA_CONSTS_H_
static const int DATA_SIZE = 3000;
static const int TASK_SIZE = 3;
#endif
send_data/package_data.h
#ifndef _PACKAGE_DATA_H_
#define _PACKAGE_DATA_H_
#include "../utils/sync_queue.hpp"
#include <boost/atomic.hpp>
#include <vector>
#include <algorithm>
#include <iostream>
class PackageData
{
private:
std::vector<int> input_;
SyncQueue<int> &data_queue_;
public:
PackageData(std::vector<int> input, SyncQueue<int> &data_queue) : input_(input), data_queue_(data_queue)
{
}
void package_data() const
{
for (int i = 0; i < input_.size(); ++i)
{
data_queue_.Put(input_[i]);
}
}
};
#endif
send_data/send_data.h
#ifndef _SEND_DATA_H_
#define _SEND_DATA_H_
#include "consts.h"
#include "../utils/sync_queue.hpp"
class SendData
{
private:
SyncQueue<int> &data_queue_;
public:
SendData(SyncQueue<int> &data_queue) : data_queue_(data_queue) {}
void send_data()
{
int ele;
for (int i = 0; i < DATA_SIZE; ++i)
{
data_queue_.Take(ele);
std::cerr << "data: " << ele << " is been sent!" << std::endl;
}
}
};
#endif
main.cpp
#include "utils/tasks_processor_multithread.hpp"
#include "utils/sync_queue.hpp"
#include "send_data/consts.h"
#include "send_data/send_data.h"
#include "send_data/package_data.h"
#include <boost/lexical_cast.hpp>
using namespace tp_multithread;
int main(int argc, char *argv[])
{
SyncQueue<int> d_queue(TASK_SIZE * DATA_SIZE);
for (std::size_t i = 0; i < TASK_SIZE; ++i)
{
std::vector<int> a;
for (int j = 0; j < DATA_SIZE; ++j)
{
int ele = 1 + i * DATA_SIZE + j;
a.emplace_back(ele);
}
tasks_processor::get().push_task([a, &d_queue]() {
PackageData pack_data(a, d_queue);
pack_data.package_data();
});
tasks_processor::get().push_task([&d_queue]() {
SendData send_data(d_queue);
send_data.send_data();
});
}
tasks_processor::get().start_multiple();
}
程序会将 1-9000拆成三个批次放入 sync_queue,然后从sync_queue中把1-9000全部读出来,如下图所示
有疑问加站长微信联系(非本文作者)