1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "threadproxy.h"
#include "common.h"
#include <dlfcn.h>
#include <pthread.h>
#include <cstdio>
namespace vespamalloc {
IAllocator * _G_myMemP = nullptr;
void setAllocatorForThreads(IAllocator * allocator)
{
_G_myMemP = allocator;
}
}
extern "C" {
typedef void * (*VoidpFunctionVoidp) (void *);
class ThreadArg
{
public:
ThreadArg(VoidpFunctionVoidp func, void * arg) : _func(func), _arg(arg) { }
VoidpFunctionVoidp _func;
void * _arg;
};
typedef int (*pthread_create_function) (pthread_t *thread,
const pthread_attr_t *attr,
VoidpFunctionVoidp start_routine,
void *arg);
int linuxthreads_pthread_getattr_np(pthread_t pid, pthread_attr_t *dst);
static void * _G_mallocThreadProxyReturnAddress = nullptr;
static std::atomic<size_t> _G_threadCount(1); // You always have the main thread.
static void cleanupThread(void * arg)
{
ThreadArg * ta = (ThreadArg *) arg;
delete ta;
vespamalloc::_G_myMemP->quitThisThread();
vespamalloc::Mutex::subThread();
_G_threadCount.fetch_sub(1);
}
void * mallocThreadProxy (void * arg)
{
ThreadArg * ta = (ThreadArg *) arg;
void * tempReturnAddress = __builtin_return_address(0);
ASSERT_STACKTRACE((_G_mallocThreadProxyReturnAddress == nullptr) || (_G_mallocThreadProxyReturnAddress == tempReturnAddress));
_G_mallocThreadProxyReturnAddress = tempReturnAddress;
vespamalloc::_G_myMemP->setReturnAddressStop(tempReturnAddress);
vespamalloc::Mutex::addThread();
vespamalloc::_G_myMemP->initThisThread();
void * result = nullptr;
DEBUG(fprintf(stderr, "arg(%p=%p), local(%p=%p)\n", &arg, arg, &ta, ta));
pthread_cleanup_push(cleanupThread, ta);
result = (*ta->_func)(ta->_arg);
pthread_cleanup_pop(1);
return result;
}
extern "C" VESPA_DLL_EXPORT int
local_pthread_create (pthread_t *thread,
const pthread_attr_t *attrOrg,
void * (*start_routine) (void *),
void * arg) __asm__("pthread_create");
VESPA_DLL_EXPORT int
local_pthread_create (pthread_t *thread,
const pthread_attr_t *attrOrg,
void * (*start_routine) (void *),
void * arg)
{
size_t numThreads = _G_threadCount;
while ((numThreads < vespamalloc::_G_myMemP->getMaxNumThreads())
&& ! _G_threadCount.compare_exchange_strong(numThreads, numThreads+1))
{ }
if (numThreads >= vespamalloc::_G_myMemP->getMaxNumThreads()) {
#if 1
// Just abort when max threads are reached.
// Future option is to make the behaviour optional.
fprintf (stderr, "All %ld threads are active! Aborting so you can start again.\n", numThreads);
abort();
#else
return EAGAIN;
#endif
}
// A pointer to the library version of pthread_create.
static pthread_create_function real_pthread_create = nullptr;
const char * pthread_createName = "pthread_create";
if (real_pthread_create == nullptr) {
real_pthread_create = (pthread_create_function) dlsym (RTLD_NEXT, pthread_createName);
if (real_pthread_create == nullptr) {
fprintf (stderr, "Could not find the pthread_create function!\n");
abort();
}
}
ThreadArg * args = new ThreadArg(start_routine, arg);
pthread_attr_t locAttr;
pthread_attr_t *attr(const_cast<pthread_attr_t *>(attrOrg));
if (attr == nullptr) {
pthread_attr_init(&locAttr);
attr = &locAttr;
}
vespamalloc::_G_myMemP->enableThreadSupport();
int retval = (*real_pthread_create)(thread, attr, mallocThreadProxy, args);
return retval;
}
}
|