C++实现golang chan 版本一

eclipser1987 · · 2207 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

这是我的一个简单例子,目前不够完善。

需要后续的优化,如,线程锁的更换,selector模式的加入,以及每个Chan依赖一个独立的线程,性能问题(一个线程管理多个Chan)。chan如何主动结束,程序结束时chan如何结束(通过数据未处理完)等等情况。阻塞式通道和非阻塞通道的设计!!!


Lock.h

#pragma once

#if defined(__cplusplus) && (__cplusplus >= 201103)
#include <thread>
typedef std::thread::id TID;
#else
// #error "not support c++11"
#if defined(WIN32) || defined(WIN64)
#include <windows.h>
typedef DWORD TID;
#elif defined(__linux__)
#include <pthread.h>  // 需要 -pthread
#include <unistd.h>
typedef pthread_t TID ;
#endif
#endif

#if defined(__cplusplus) && (__cplusplus >= 201103)
#include <mutex>
class VLock {
	std::mutex m_Lock; 
public:
	void Lock() {m_Lock.lock();}
	void UnLock() {m_Lock.unlock();};
	bool TryLock() {return m_Lock.try_lock();};
};
#else
//#error "not support c++11"
#if defined(WIN32) || defined(WIN64)
class VLock {
	CRITICAL_SECTION m_Lock ;
public:
	VLock() {InitializeCriticalSection(&m_Lock);}
	~VLock() {DeleteCriticalSection(&m_Lock);}
	void Lock() {EnterCriticalSection(&m_Lock);}
	void UnLock() {LeaveCriticalSection(&m_Lock);};
};
#elif defined(__linux__)
class VLock {
	pthread_mutex_t 	m_Mutex; 
public:
	VLock() {pthread_mutex_init( &m_Mutex , NULL );}
	~VLock() {pthread_mutex_destroy( &m_Mutex) ;}
	void Lock() {pthread_mutex_lock(&m_Mutex);}
	void UnLock() {pthread_mutex_unlock(&m_Mutex);};
};
#endif
#endif



//自动加锁解锁器
class AutoLock_T
{
public:
	AutoLock_T(VLock& rLock)
	{
		m_pLock = &rLock;
		m_pLock->Lock();
	}
	~AutoLock_T()
	{
		m_pLock->UnLock();
	}
protected:
private:
	AutoLock_T();
	VLock* m_pLock;
};



//当前线程挂起一定时间
extern void VSleep( unsigned int millionseconds = 0 ) ;

//获得当前线程ID
extern TID VGetCurrentThreadId( ) ;


Lock.cpp

#include "Lock.h"


void VSleep( unsigned int millionseconds ) {
#if defined(__cplusplus) && (__cplusplus >= 201103)
	std::this_thread::sleep_for(std::chrono::milliseconds(millionseconds));
#else
	// #error "not support c++11"
#if defined(WIN32) || defined(WIN64)
	Sleep( millionseconds ) ;
#elif defined(__linux__)
	usleep( millionseconds*1000 ) ;
#endif
#endif

}

TID VGetCurrentThreadId() {
#if defined(__cplusplus) && (__cplusplus >= 201103)
	return std::this_thread::get_id();
#else
	// #error "not support c++11"
#if defined(WIN32) || defined(WIN64)
	return GetCurrentThreadId( ) ;
#elif defined(__linux__)
	return pthread_self();
#endif
#endif

}

Thread.h

#pragma once

#include "Lock.h"

//当定义此宏时,所有线程将只执行一次后就推出。
//#define _EXEONECE 10

#if defined(WIN32) || defined(WIN64)
DWORD WINAPI MyThreadProcess( void* derivedThread );
#elif defined(__linux__)
void* MyThreadProcess ( void * derivedThread );
#endif


class Thread {

public :

	enum ThreadStatus 
	{
		READY ,		// 当前线程处于准备状态
		RUNNING ,	// 处于运行状态
		EXITING ,	// 线程正在退出
		EXIT		// 已经退出 
	};

public :

	Thread ( ) ;

	virtual ~Thread () ;

public :

	void start () ;

	virtual void stop () ;

	void exit ( void * retval = NULL ) ;

	virtual void run () ;
public :
	TID getTID () const { return m_TID; }

	ThreadStatus getStatus () const { return m_Status; }

	void setStatus ( ThreadStatus status ) { m_Status = status; }


#if defined(__cplusplus) && (__cplusplus >= 201103)
	bool joinable() const {
		return m_hThread->joinable();
	}
#endif


#if defined(__cplusplus) && (__cplusplus >= 201103)
	void join() {
		return m_hThread->join();
	}
#endif

private :

	TID m_TID;

	ThreadStatus m_Status;

#if defined(__cplusplus) && (__cplusplus >= 201103)
	std::thread* m_hThread;
#else
	// #error "not support c++11"
#if defined(WIN32) || defined(WIN64)
	HANDLE m_hThread ;
#endif
#endif


};

extern unsigned int g_QuitThreadCount ;


Thread.cpp

#include "Thread.h"


unsigned int g_QuitThreadCount = 0 ;
VLock g_thread_lock ;

Thread::Thread ( ) {
	try {
#if defined(__cplusplus) && (__cplusplus >= 201103)
#else
		m_TID		= 0 ;
#endif
		m_Status	= Thread::READY ;
#if defined(__cplusplus) && (__cplusplus >= 201103)
		m_hThread = new std::thread(MyThreadProcess, this);
		m_TID = m_hThread->get_id();
#else
		// #error "not support c++11"
#if defined(WIN32) || defined(WIN64)
		m_hThread = NULL ;
#elif defined(__linux__)
#endif
#endif
	} catch (...) {

	}
}

Thread::~Thread () {
}

void Thread::start() {
	try {
		if ( m_Status != Thread::READY )
			return ;

		m_Status = Thread::RUNNING;

#if defined(__cplusplus) && (__cplusplus >= 201103)
		m_hThread->detach();
#else
		// #error "not support c++11"
#if defined(WIN32) || defined(WIN64)
		m_hThread = CreateThread( NULL, 0, MyThreadProcess , this, 0, &m_TID ) ;
#elif defined(__linux__)
		pthread_create( &m_TID, NULL , MyThreadProcess , this );
#endif
#endif

	} catch (...) {

	}
}

void Thread::stop() {


}

void Thread::exit(void* retval) {
	try {
#if defined(__cplusplus) && (__cplusplus >= 201103)
		delete m_hThread;
		m_hThread = NULL;
#else
		// #error "not support c++11"
#if defined(WIN32) || defined(WIN64)
		CloseHandle( m_hThread ) ;
#elif defined(__linux__)
		pthread_exit( retval );
#endif
#endif
	} catch (...) {

	}
}


void Thread::run( ) {

}

//===================



#if defined(WIN32) || defined(WIN64)

DWORD WINAPI MyThreadProcess(  VOID* derivedThread )
{
	try
	{
		Thread * thread = (Thread *) derivedThread;
		if( thread == NULL )
			return 0;

		thread->run();

		thread->setStatus(Thread::EXIT);

		thread->exit(NULL);

		g_thread_lock.Lock() ;
		g_QuitThreadCount++ ;
		g_thread_lock.UnLock() ;
	} catch (...) {
	}

	return 0;	// avoid compiler's warning
}

#elif defined(__linux__)

void * MyThreadProcess ( void * derivedThread ) {
	try {

		Thread * thread = (Thread *)derivedThread;
		if( thread==NULL )
			return NULL;

		thread->run();

		thread->setStatus(Thread::EXIT);

		//INT ret = 0;
		//thread->exit(&ret);

		g_thread_lock.Lock() ;
		g_QuitThreadCount++ ;
		g_thread_lock.UnLock() ;

	} catch (...) {

	}

	return NULL;	// avoid compiler's warning
}

#endif

Chan.h

#include "Thread.h"

/* 
 * File:   Chain.h
 * Author: Vicky.H
 * Email:  eclipser@163.com
 *
 * Created on 2014年4月22日, 下午3:46
 */
#pragma once
#include "Thread.h"
#include <list>

template<typename T>
class ICommand {
public:
    virtual void action (T* pResult) {}
};

extern bool g_ChanState;

extern VLock g_ChanStateLock;


// 通道
template<typename T>
class Chan : public Thread {

	template<typename E>
	friend void operator< (Chan<E>& chan, ICommand<E>* pCommand);

	template<typename E>
	friend bool operator> (Chan<E>& chan, E** ppResult);
public:

    Chan() {
        start();
    }

    void run() {

		g_ChanStateLock.Lock();
		bool _chanState = g_ChanState;
		g_ChanStateLock.UnLock();

        while (_chanState) {
            m_Lock.Lock();
            std::list<ICommand<T>*>::iterator it = m_Commands.begin();
			while (it != m_Commands.end()) {
				T* pResult = new T;
				(*it)->action(pResult);
				m_Results.push_back(pResult);
				it++; 
			}
			m_Commands.clear();
                
            VSleep(10L);
            m_Lock.UnLock();

			g_ChanStateLock.Lock();
			_chanState = g_ChanState;
			g_ChanStateLock.UnLock();
        }
    }

		
private:
    VLock m_Lock;
	std::list<T*> m_Results;
    std::list<ICommand<T> *> m_Commands;
};

	
template<typename E>
void operator< (Chan<E>& chan, ICommand<E>* pCommand) {
	chan.m_Lock.Lock();
	chan.m_Commands.push_back(pCommand);
	chan.m_Lock.UnLock();
}

template<typename E>
bool operator> (Chan<E>& chan, E** ppResult) {
	for (;;) {
		chan.m_Lock.Lock();
		if (!chan.m_Results.empty()) {
			break;
		}
		chan.m_Lock.UnLock();

		VSleep(10L);
	}

	AutoLock_T __lock(chan.m_Lock);
	if (chan.m_Results.empty()) {
		return false;
	}
	*ppResult = *(chan.m_Results.begin());
	chan.m_Results.pop_front();
	return true;
}

Chan.cpp

#include "Thread.h"

/* 
 * File:   Chain.h
 * Author: Vicky.H
 * Email:  eclipser@163.com
 *
 * Created on 2014年4月22日, 下午3:46
 */
#pragma once
#include "Thread.h"
#include <list>

template<typename T>
class ICommand {
public:
    virtual void action (T* pResult) {}
};

extern bool g_ChanState;

extern VLock g_ChanStateLock;


// 通道
template<typename T>
class Chan : public Thread {

	template<typename E>
	friend void operator< (Chan<E>& chan, ICommand<E>* pCommand);

	template<typename E>
	friend bool operator> (Chan<E>& chan, E** ppResult);
public:

    Chan() {
        start();
    }

    void run() {

		g_ChanStateLock.Lock();
		bool _chanState = g_ChanState;
		g_ChanStateLock.UnLock();

        while (_chanState) {
            m_Lock.Lock();
            std::list<ICommand<T>*>::iterator it = m_Commands.begin();
			while (it != m_Commands.end()) {
				T* pResult = new T;
				(*it)->action(pResult);
				m_Results.push_back(pResult);
				it++; 
			}
			m_Commands.clear();
                
            VSleep(10L);
            m_Lock.UnLock();

			g_ChanStateLock.Lock();
			_chanState = g_ChanState;
			g_ChanStateLock.UnLock();
        }
    }

		
private:
    VLock m_Lock;
	std::list<T*> m_Results;
    std::list<ICommand<T> *> m_Commands;
};

	
template<typename E>
void operator< (Chan<E>& chan, ICommand<E>* pCommand) {
	chan.m_Lock.Lock();
	chan.m_Commands.push_back(pCommand);
	chan.m_Lock.UnLock();
}

template<typename E>
bool operator> (Chan<E>& chan, E** ppResult) {
	for (;;) {
		chan.m_Lock.Lock();
		if (!chan.m_Results.empty()) {
			break;
		}
		chan.m_Lock.UnLock();

		VSleep(10L);
	}

	AutoLock_T __lock(chan.m_Lock);
	if (chan.m_Results.empty()) {
		return false;
	}
	*ppResult = *(chan.m_Results.begin());
	chan.m_Results.pop_front();
	return true;
}


简单测试:

main.cpp

/* 
 * File:   main.cpp
 * Author: Vicky.H
 * Email:  eclipser@163.com
 */
#include "Chan.h"
#include <iostream>
#include <thread>

extern bool g_ChanState;

void exitHook() {
    g_ChanState = false;
}

template<>
class ICommand<int> {
public:
	
	virtual void action (int* pResult) {
		std::cout << "Thread:" << VGetCurrentThreadId( ) << " SayHelloChain action ..." << std::endl;
		*pResult = i++;
		// VSleep(1000L);
	}
private:
	static int i;
};

int ICommand<int>::i = 0;


class AddCommand : public ICommand<int> {
public:
	AddCommand(int i, int j) :i (i), j(j) {
	}

	void action (int* pResult) {
		if (i > j) {
			int _tmp = i;
			i = j;
			j = _tmp;
		}

		int _sum = 0;
		for (int index = i; index < j; index++) {
			_sum += index;
		}
		*pResult = _sum;

		std::cout << "sum : " << _sum << std::endl;
	}
private:
	int i;
	int j;
};


/*
 * 
 */
int main(void) {
	// 测试1
//     Chain<int> chan1;
// 	
// 	ICommand<int> *pC1 = new ICommand<int>;
// 	ICommand<int> *pC2 = new ICommand<int>;
// 	ICommand<int> *pC3 = new ICommand<int>;
// 	ICommand<int> *pC4 = new ICommand<int>;
// 	ICommand<int> *pC5 = new ICommand<int>;
// 	ICommand<int> *pC6 = new ICommand<int>;
// 	chan1 < pC1;
// 	chan1 < pC2;
// 	chan1 < pC3;
// 	chan1 < pC4;
// 	chan1 < pC5;
// 	chan1 < pC6;
// 
// 	std::thread t1([&chan1](){
// 		int* pResult = new int;
// 		while (chan1 > &pResult) {
// 			std::cout << "result : " << *pResult << std::endl;
// 		}
// 	});
// 	t1.detach();

	// 测试2
// 	Chain<int> chan2;
// 	AddCommand* p1 = new AddCommand(0, 100);
// 	AddCommand* p2 = new AddCommand(101, 200);
// 	AddCommand* p3 = new AddCommand(201, 300);
// 	AddCommand* p4 = new AddCommand(301, 400);
// 	AddCommand* p5 = new AddCommand(401, 500);
// 	AddCommand* p6 = new AddCommand(501, 600);
// 	AddCommand* p7 = new AddCommand(601, 700);
// 	AddCommand* p8 = new AddCommand(701, 800);
// 	AddCommand* p9 = new AddCommand(801, 900);
// 	AddCommand* p0 = new AddCommand(901, 1000);
// 
// 	chan2 < p1;
// 	chan2 < p2;
// 	chan2 < p3;
// 	chan2 < p4;
// 	chan2 < p5;
// 	chan2 < p6;
// 	chan2 < p7;
// 	chan2 < p8;
// 	chan2 < p9;
// 	chan2 < p0;
// 
// 	std::thread t2([&chan2](){
// 		int sum = 0;
// 		int* pResult = new int;
// 		while (chan2 > &pResult) {
// 			sum += *pResult;
// 			std::cout << "sum : " << sum << std::endl;
// 		}
// 	});
// 	t2.detach();

	// 测试3
	Chan<int> chan1;
	Chan<int> chan2;
	Chan<int> chan3;

	AddCommand* p1 = new AddCommand(0, 100);
	AddCommand* p2 = new AddCommand(101, 200);
	AddCommand* p3 = new AddCommand(201, 300);
	
	int sum = 0;

	std::thread t1([&chan1, &sum](){
		
		int* pResult = new int;
		while (chan1 > &pResult) {
			sum += *pResult;
			// std::cout << "sum 1 : " << sum << std::endl;
		}
	});
	t1.detach();

	std::thread t2([&chan2, &sum](){
		int* pResult = new int;
		while (chan2 > &pResult) {
			sum += *pResult;
			// std::cout << "sum 2 : " << sum << std::endl;
		}
	});
	t2.detach();

	std::thread t3([&chan3, &sum](){
		
		int* pResult = new int;
		while (chan3 > &pResult) {
			sum += *pResult;
			// std::cout << "sum 3 : " << sum << std::endl;
		}
	});
	t3.detach();
	
	chan1 < p1;
	chan2 < p2;
	chan3 < p3;



	// 执行 < 很快,但执行 > 却非常慢!!!需要优化。可以考虑回调结构,或者pipe?

	VSleep(500L);
    
	// 打印主函数线程
	for (;;) {
		// std::cout << "Thread:" << VGetCurrentThreadId( ) << std::endl;
		VSleep(1000L);
	}

	atexit(exitHook);
	return 0;
}












有疑问加站长微信联系(非本文作者)

本文来自:CSDN博客

感谢作者:eclipser1987

查看原文:C++实现golang chan 版本一

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2207 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传