boost::thread_group前两天讲过了,比较简单,其实就是一个boost::list<boost::shared_ptr<boost::thread>> threads类型对象的一个包装,封装一组线程列表。
同步队列的实现也是比较简单的使用了C++11提供的std::mutex和std::unique_lock对象,如果要兼容C++11之前的版本,可以将相关对象的名称空间换成 boost就可以。
下面上代码,
工程结构如下,
utils/sync_queue.hpp
#ifndef _SYNC_QUEUE_HPP_
#define _SYNC_QUEUE_HPP_
#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>
using namespace std;
template <typename T>
class SyncQueue {
public:
SyncQueue(int maxSize): m_maxSize(maxSize), m_needStop(false) {}
void Put(const T& x) {
Add(x);
}
void Put(T&& x) {
Add(std::forward<T>(x));
}
void Take(std::list<T>& list) {
std::unique_lock<std::mutex> locker(m_mutex);
m_NotEmpty.wait(locker, [this] {
return m_needStop || NotEmpty();
});
if(m_needStop) {
return;
}
list = std::move(m_queue);
m_NotFull.notify_one();
}
void Take(T& t) {
std::unique_lock<std::mutex> locker(m_mutex);
m_NotEmpty.wait(locker, [this] {
return m_needStop || NotEmpty();
});
if(m_needStop) {
return;
}
t = m_queue.front();
m_queue.pop_front();
m_NotFull.notify_one();
}
void Stop() {
{
std::lock_guard<std::mutex> locker(m_mutex);
m_needStop = true;
}
m_NotFull.notify_all();
m_NotEmpty.notify_all();
}
bool Empty() {
std::lock_guard<std::mutex> locker;
return m_queue.empty();
}
bool Full() {
std::lock_guard<std::mutex> locker;
return m_queue.size() == m_maxSize;
}
private:
// 判断队列未满,内部使用的无锁版,否则会发生死锁
bool NotFull() const {
bool full = m_queue.size() >= m_maxSize;
return !full;
}
bool NotEmpty() const {
bool empty = m_queue.empty();
return !empty;
}
template <typename F>
void Add(F&& x) {
std::unique_lock<std::mutex> locker(m_mutex);
m_NotFull.wait(locker, [this]{ return m_needStop || NotFull(); });
if(m_needStop) {
return;
}
m_queue.push_back(std::forward<F>(x));
m_NotEmpty.notify_one();
}
private:
std::list<T> m_queue; // 缓冲区
std::mutex m_mutex; // 互斥量和条件变量结合起来使用
std::condition_variable m_NotEmpty; // 不为空的条件变量
std::condition_variable m_NotFull; // 不为满的条件变量
int m_maxSize; // 同步队列的最大大小
bool m_needStop; // 停止标志
};
#endif
send_data/consts.h
#ifndef _SEND_DATA_CONSTS_H_
#define _SEND_DATA_CONSTS_H_
namespace send_data {
static const int DATA_SIZE = 3000;
static const int THREAD_SIZE = 3;
};
#endif
send_data/package_data.h
#ifndef _PACKAGE_DATA_H_
#define _PACKAGE_DATA_H_
#include "../utils/sync_queue.hpp"
#include <boost/atomic.hpp>
#include <vector>
#include <algorithm>
namespace send_data
{
class PackageData
{
private:
std::vector<int> input_;
SyncQueue<int> &data_queue_;
public:
PackageData(std::vector<int> input, SyncQueue<int> &data_queue) : input_(input), data_queue_(data_queue)
{
}
void package_data() const
{
for (int i = 0; i < input_.size(); ++i)
{
data_queue_.Put(input_[i]);
}
}
};
};
#endif
send_data/send_data.h
#ifndef _SEND_DATA_H_
#define _SEND_DATA_H_
#include "consts.h"
#include "../utils/sync_queue.hpp"
namespace send_data
{
class SendData
{
private:
SyncQueue<int> &data_queue_;
public:
SendData(SyncQueue<int> &data_queue) : data_queue_(data_queue) {}
void send_data()
{
int ele;
for (int i = 0; i < DATA_SIZE; ++i)
{
data_queue_.Take(ele);
std::cerr << "data: " << ele << " is been sent!" << std::endl;
}
}
};
};
#endif
main.cpp
#include "send_data/package_data.h"
#include "send_data/send_data.h"
#include "utils/sync_queue.hpp"
#include <boost/thread.hpp>
using namespace send_data;
int main(int argc, char* argv[]) {
boost::thread_group package_data_g;
boost::thread_group send_data_g;
SyncQueue<int> d_queue(THREAD_SIZE * DATA_SIZE);
// std::vector<int> a{1,2,3};
// // 1 + 0 *3
// // 1 + 1 *3
// // 1 + 2
// std::vector<int> b{4,5, 6};
// std::vector<int> c{7,8,9};
for(int i=0; i<THREAD_SIZE; ++i) {
std::vector<int> a;
for(int j=0; j<DATA_SIZE; ++j) {
a.emplace_back(1 + i*DATA_SIZE +j);
}
PackageData pkg_data(a, d_queue);
package_data_g.create_thread([pkg_data]() {
pkg_data.package_data();
});
}
SendData sdata(d_queue);
for(int i=0; i<THREAD_SIZE; ++i) {
send_data_g.create_thread([&sdata]() {
sdata.send_data();
});
}
package_data_g.join_all();
send_data_g.join_all();
}
程序会将 1-9000拆成三个批次放入 sync_queue,然后从sync_queue中把1-9000全部读出来,如下图所示,
这种方式存在一些限制,比如 打包的线程和发送的线程 个数必须一样,不然会造成条件变量的死等。
所以生产实践中不建议使用,可以使用下一节讲述的boost::asio::io_service对象来处理,效果更好,限制更少。
有疑问加站长微信联系(非本文作者)