Port Thread* methods to boost::thread_group

This commit is contained in:
Gavin Andresen
2013-03-06 22:31:26 -05:00
parent 72f14d26ec
commit 21eb5adadb
10 changed files with 134 additions and 494 deletions

View File

@@ -26,14 +26,6 @@ using namespace boost;
static const int MAX_OUTBOUND_CONNECTIONS = 8;
void ThreadMessageHandler2(void* parg);
void ThreadSocketHandler2(void* parg);
void ThreadOpenConnections2(void* parg);
void ThreadOpenAddedConnections2(void* parg);
#ifdef USE_UPNP
void ThreadMapPort2(void* parg);
#endif
void ThreadDNSAddressSeed2(void* parg);
bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false);
@@ -46,7 +38,6 @@ struct LocalServiceInfo {
// Global state variables
//
bool fDiscover = true;
bool fUseUPnP = false;
uint64 nLocalServices = NODE_NETWORK;
static CCriticalSection cs_mapLocalHost;
static map<CNetAddr, LocalServiceInfo> mapLocalHost;
@@ -754,32 +745,10 @@ void SocketSendData(CNode *pnode)
pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it);
}
void ThreadSocketHandler(void* parg)
{
// Make this thread recognisable as the networking thread
RenameThread("bitcoin-net");
try
{
vnThreadsRunning[THREAD_SOCKETHANDLER]++;
ThreadSocketHandler2(parg);
vnThreadsRunning[THREAD_SOCKETHANDLER]--;
}
catch (std::exception& e) {
vnThreadsRunning[THREAD_SOCKETHANDLER]--;
PrintException(&e, "ThreadSocketHandler()");
} catch (...) {
vnThreadsRunning[THREAD_SOCKETHANDLER]--;
throw; // support pthread_cancel()
}
printf("ThreadSocketHandler exited\n");
}
static list<CNode*> vNodesDisconnected;
void ThreadSocketHandler2(void* parg)
void ThreadSocketHandler()
{
printf("ThreadSocketHandler started\n");
unsigned int nPrevNodeCount = 0;
loop
{
@@ -892,12 +861,10 @@ void ThreadSocketHandler2(void* parg)
}
}
vnThreadsRunning[THREAD_SOCKETHANDLER]--;
int nSelect = select(have_fds ? hSocketMax + 1 : 0,
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
vnThreadsRunning[THREAD_SOCKETHANDLER]++;
if (fShutdown)
return;
boost::this_thread::interruption_point();
if (nSelect == SOCKET_ERROR)
{
if (have_fds)
@@ -984,8 +951,7 @@ void ThreadSocketHandler2(void* parg)
}
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (fShutdown)
return;
boost::this_thread::interruption_point();
//
// Receive
@@ -1089,31 +1055,8 @@ void ThreadSocketHandler2(void* parg)
#ifdef USE_UPNP
void ThreadMapPort(void* parg)
void ThreadMapPort()
{
// Make this thread recognisable as the UPnP thread
RenameThread("bitcoin-UPnP");
try
{
vnThreadsRunning[THREAD_UPNP]++;
ThreadMapPort2(parg);
vnThreadsRunning[THREAD_UPNP]--;
}
catch (std::exception& e) {
vnThreadsRunning[THREAD_UPNP]--;
PrintException(&e, "ThreadMapPort()");
} catch (...) {
vnThreadsRunning[THREAD_UPNP]--;
PrintException(NULL, "ThreadMapPort()");
}
printf("ThreadMapPort exited\n");
}
void ThreadMapPort2(void* parg)
{
printf("ThreadMapPort started\n");
std::string port = strprintf("%u", GetListenPort());
const char * multicastif = 0;
const char * minissdpdpath = 0;
@@ -1154,33 +1097,9 @@ void ThreadMapPort2(void* parg)
}
string strDesc = "Bitcoin " + FormatFullVersion();
#ifndef UPNPDISCOVER_SUCCESS
/* miniupnpc 1.5 */
r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype,
port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0);
#else
/* miniupnpc 1.6 */
r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype,
port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0, "0");
#endif
if(r!=UPNPCOMMAND_SUCCESS)
printf("AddPortMapping(%s, %s, %s) failed with code %d (%s)\n",
port.c_str(), port.c_str(), lanaddr, r, strupnperror(r));
else
printf("UPnP Port Mapping successful.\n");
int i = 1;
loop {
if (fShutdown || !fUseUPnP)
{
r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0);
printf("UPNP_DeletePortMapping() returned : %d\n", r);
freeUPNPDevlist(devlist); devlist = 0;
FreeUPNPUrls(&urls);
return;
}
if (i % 600 == 0) // Refresh every 20 minutes
{
try {
loop {
#ifndef UPNPDISCOVER_SUCCESS
/* miniupnpc 1.5 */
r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype,
@@ -1196,33 +1115,49 @@ void ThreadMapPort2(void* parg)
port.c_str(), port.c_str(), lanaddr, r, strupnperror(r));
else
printf("UPnP Port Mapping successful.\n");;
MilliSleep(20*60*1000); // Refresh every 20 minutes
}
MilliSleep(2000);
i++;
}
catch (boost::thread_interrupted)
{
r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0);
printf("UPNP_DeletePortMapping() returned : %d\n", r);
freeUPNPDevlist(devlist); devlist = 0;
FreeUPNPUrls(&urls);
throw;
}
} else {
printf("No valid UPnP IGDs found\n");
freeUPNPDevlist(devlist); devlist = 0;
if (r != 0)
FreeUPNPUrls(&urls);
loop {
if (fShutdown || !fUseUPnP)
return;
MilliSleep(2000);
}
}
}
void MapPort()
void MapPort(bool fUseUPnP)
{
if (fUseUPnP && vnThreadsRunning[THREAD_UPNP] < 1)
static boost::thread* upnp_thread = NULL;
if (fUseUPnP)
{
if (!NewThread(ThreadMapPort, NULL))
printf("Error: ThreadMapPort(ThreadMapPort) failed\n");
if (upnp_thread) {
upnp_thread->interrupt();
upnp_thread->join();
delete upnp_thread;
}
upnp_thread = new boost::thread(boost::bind(&TraceThread<boost::function<void()> >, "upnp", &ThreadMapPort));
}
else if (upnp_thread) {
upnp_thread->interrupt();
upnp_thread->join();
delete upnp_thread;
upnp_thread = NULL;
}
}
#else
void MapPort()
void MapPort(bool)
{
// Intentionally left blank.
}
@@ -1254,32 +1189,10 @@ static const char *strTestNetDNSSeed[][2] = {
{NULL, NULL}
};
void ThreadDNSAddressSeed(void* parg)
{
// Make this thread recognisable as the DNS seeding thread
RenameThread("bitcoin-dnsseed");
try
{
vnThreadsRunning[THREAD_DNSSEED]++;
ThreadDNSAddressSeed2(parg);
vnThreadsRunning[THREAD_DNSSEED]--;
}
catch (std::exception& e) {
vnThreadsRunning[THREAD_DNSSEED]--;
PrintException(&e, "ThreadDNSAddressSeed()");
} catch (...) {
vnThreadsRunning[THREAD_DNSSEED]--;
throw; // support pthread_cancel()
}
printf("ThreadDNSAddressSeed exited\n");
}
void ThreadDNSAddressSeed2(void* parg)
void ThreadDNSAddressSeed()
{
static const char *(*strDNSSeed)[2] = fTestNet ? strTestNetDNSSeed : strMainNetDNSSeed;
printf("ThreadDNSAddressSeed started\n");
int found = 0;
printf("Loading addresses from DNS seeds (could take a while)\n");
@@ -1409,57 +1322,6 @@ void DumpAddresses()
addrman.size(), GetTimeMillis() - nStart);
}
void ThreadDumpAddress2(void* parg)
{
printf("ThreadDumpAddress started\n");
vnThreadsRunning[THREAD_DUMPADDRESS]++;
while (!fShutdown)
{
DumpAddresses();
vnThreadsRunning[THREAD_DUMPADDRESS]--;
MilliSleep(100000);
vnThreadsRunning[THREAD_DUMPADDRESS]++;
}
vnThreadsRunning[THREAD_DUMPADDRESS]--;
}
void ThreadDumpAddress(void* parg)
{
// Make this thread recognisable as the address dumping thread
RenameThread("bitcoin-adrdump");
try
{
ThreadDumpAddress2(parg);
}
catch (std::exception& e) {
PrintException(&e, "ThreadDumpAddress()");
}
printf("ThreadDumpAddress exited\n");
}
void ThreadOpenConnections(void* parg)
{
// Make this thread recognisable as the connection opening thread
RenameThread("bitcoin-opencon");
try
{
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
ThreadOpenConnections2(parg);
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
}
catch (std::exception& e) {
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
PrintException(&e, "ThreadOpenConnections()");
} catch (...) {
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
PrintException(NULL, "ThreadOpenConnections()");
}
printf("ThreadOpenConnections exited\n");
}
void static ProcessOneShot()
{
string strDest;
@@ -1478,10 +1340,8 @@ void static ProcessOneShot()
}
}
void ThreadOpenConnections2(void* parg)
void ThreadOpenConnections()
{
printf("ThreadOpenConnections started\n");
// Connect to specific addresses
if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0)
{
@@ -1495,8 +1355,6 @@ void ThreadOpenConnections2(void* parg)
for (int i = 0; i < 10 && i < nLoop; i++)
{
MilliSleep(500);
if (fShutdown)
return;
}
}
MilliSleep(500);
@@ -1509,18 +1367,10 @@ void ThreadOpenConnections2(void* parg)
{
ProcessOneShot();
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
MilliSleep(500);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown)
return;
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
CSemaphoreGrant grant(*semOutbound);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown)
return;
boost::this_thread::interruption_point();
// Add seed nodes if IRC isn't working
if (addrman.size()==0 && (GetTime() - nStart > 60) && !fTestNet)
@@ -1600,38 +1450,15 @@ void ThreadOpenConnections2(void* parg)
}
}
void ThreadOpenAddedConnections(void* parg)
void ThreadOpenAddedConnections()
{
// Make this thread recognisable as the connection opening thread
RenameThread("bitcoin-opencon");
try
{
vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++;
ThreadOpenAddedConnections2(parg);
vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
}
catch (std::exception& e) {
vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
PrintException(&e, "ThreadOpenAddedConnections()");
} catch (...) {
vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
PrintException(NULL, "ThreadOpenAddedConnections()");
}
printf("ThreadOpenAddedConnections exited\n");
}
void ThreadOpenAddedConnections2(void* parg)
{
printf("ThreadOpenAddedConnections started\n");
{
LOCK(cs_vAddedNodes);
vAddedNodes = mapMultiArgs["-addnode"];
}
if (HaveNameProxy()) {
while(!fShutdown) {
while(true) {
list<string> lAddresses(0);
{
LOCK(cs_vAddedNodes);
@@ -1643,14 +1470,9 @@ void ThreadOpenAddedConnections2(void* parg)
CSemaphoreGrant grant(*semOutbound);
OpenNetworkConnection(addr, &grant, strAddNode.c_str());
MilliSleep(500);
if (fShutdown)
return;
}
vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
MilliSleep(120000); // Retry every 2 minutes
vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++;
}
return;
}
for (unsigned int i = 0; true; i++)
@@ -1695,16 +1517,8 @@ void ThreadOpenAddedConnections2(void* parg)
CSemaphoreGrant grant(*semOutbound);
OpenNetworkConnection(CAddress(vserv[i % vserv.size()]), &grant);
MilliSleep(500);
if (fShutdown)
return;
}
if (fShutdown)
return;
vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
MilliSleep(120000); // Retry every 2 minutes
vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++;
if (fShutdown)
return;
}
}
@@ -1714,8 +1528,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu
//
// Initiate outbound network connection
//
if (fShutdown)
return false;
boost::this_thread::interruption_point();
if (!strDest)
if (IsLocal(addrConnect) ||
FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect) ||
@@ -1724,11 +1537,9 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu
if (strDest && FindNode(strDest))
return false;
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
CNode* pnode = ConnectNode(addrConnect, strDest);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown)
return false;
boost::this_thread::interruption_point();
if (!pnode)
return false;
if (grantOutbound)
@@ -1746,33 +1557,10 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu
void ThreadMessageHandler(void* parg)
void ThreadMessageHandler()
{
// Make this thread recognisable as the message handling thread
RenameThread("bitcoin-msghand");
try
{
vnThreadsRunning[THREAD_MESSAGEHANDLER]++;
ThreadMessageHandler2(parg);
vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
}
catch (std::exception& e) {
vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
PrintException(&e, "ThreadMessageHandler()");
} catch (...) {
vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
PrintException(NULL, "ThreadMessageHandler()");
}
printf("ThreadMessageHandler exited\n");
}
void ThreadMessageHandler2(void* parg)
{
printf("ThreadMessageHandler started\n");
SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL);
while (!fShutdown)
while (true)
{
vector<CNode*> vNodesCopy;
{
@@ -1798,8 +1586,7 @@ void ThreadMessageHandler2(void* parg)
if (!ProcessMessages(pnode))
pnode->CloseSocketDisconnect();
}
if (fShutdown)
return;
boost::this_thread::interruption_point();
// Send messages
{
@@ -1807,8 +1594,7 @@ void ThreadMessageHandler2(void* parg)
if (lockSend)
SendMessages(pnode, pnode == pnodeTrickle);
}
if (fShutdown)
return;
boost::this_thread::interruption_point();
}
{
@@ -1817,16 +1603,7 @@ void ThreadMessageHandler2(void* parg)
pnode->Release();
}
// Wait and allow messages to bunch up.
// Reduce vnThreadsRunning so StopNode has permission to exit while
// we're sleeping, but we must always check fShutdown after doing this.
vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
MilliSleep(100);
if (fRequestShutdown)
StartShutdown();
vnThreadsRunning[THREAD_MESSAGEHANDLER]++;
if (fShutdown)
return;
}
}
@@ -2000,6 +1777,8 @@ void static Discover()
void StartNode(void* parg)
{
boost::thread_group* threadGroup = (boost::thread_group*)parg;
// Make this thread recognisable as the startup thread
RenameThread("bitcoin-start");
@@ -2021,38 +1800,32 @@ void StartNode(void* parg)
if (!GetBoolArg("-dnsseed", true))
printf("DNS seeding disabled\n");
else
if (!NewThread(ThreadDNSAddressSeed, NULL))
printf("Error: NewThread(ThreadDNSAddressSeed) failed\n");
threadGroup->create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", &ThreadDNSAddressSeed));
// Map ports with UPnP
if (fUseUPnP)
MapPort();
MapPort(GetBoolArg("-upnp", USE_UPNP));
// Send and receive from sockets, accept connections
if (!NewThread(ThreadSocketHandler, NULL))
printf("Error: NewThread(ThreadSocketHandler) failed\n");
threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "net", &ThreadSocketHandler));
// Initiate outbound connections from -addnode
if (!NewThread(ThreadOpenAddedConnections, NULL))
printf("Error: NewThread(ThreadOpenAddedConnections) failed\n");
threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "addcon", &ThreadOpenAddedConnections));
// Initiate outbound connections
if (!NewThread(ThreadOpenConnections, NULL))
printf("Error: NewThread(ThreadOpenConnections) failed\n");
threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections));
// Process messages
if (!NewThread(ThreadMessageHandler, NULL))
printf("Error: NewThread(ThreadMessageHandler) failed\n");
threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler));
// Dump network addresses
if (!NewThread(ThreadDumpAddress, NULL))
printf("Error; NewThread(ThreadDumpAddress) failed\n");
threadGroup->create_thread(boost::bind(&LoopForever<void (*)()>, "dumpaddr", &DumpAddresses, 10000));
}
bool StopNode()
{
printf("StopNode()\n");
GenerateBitcoins(false, NULL);
MapPort(false);
fShutdown = true;
nTransactionsUpdated++;
int64 nStart = GetTime();
@@ -2070,19 +1843,6 @@ bool StopNode()
break;
MilliSleep(20);
} while(true);
if (vnThreadsRunning[THREAD_SOCKETHANDLER] > 0) printf("ThreadSocketHandler still running\n");
if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n");
if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n");
if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n");
if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n");
#ifdef USE_UPNP
if (vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n");
#endif
if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n");
if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n");
if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n");
while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0)
MilliSleep(20);
MilliSleep(50);
DumpAddresses();