Line data Source code
1 : /**
2 : * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0.
4 : */
5 :
6 : #if !defined(__MACH__)
7 : # define _GNU_SOURCE
8 : #endif
9 :
10 : #include <aws/common/clock.h>
11 : #include <aws/common/linked_list.h>
12 : #include <aws/common/logging.h>
13 : #include <aws/common/private/dlloads.h>
14 : #include <aws/common/private/thread_shared.h>
15 : #include <aws/common/thread.h>
16 :
17 : #include <dlfcn.h>
18 : #include <errno.h>
19 : #include <inttypes.h>
20 : #include <limits.h>
21 : #include <sched.h>
22 : #include <time.h>
23 : #include <unistd.h>
24 :
25 : #if defined(__FreeBSD__) || defined(__NETBSD__)
26 : # include <pthread_np.h>
27 : typedef cpuset_t cpu_set_t;
28 : #endif
29 :
30 : #if !defined(AWS_AFFINITY_METHOD)
31 : # error "Must provide a method for setting thread affinity"
32 : #endif
33 :
34 : // Possible methods for setting thread affinity
35 : #define AWS_AFFINITY_METHOD_NONE 0
36 : #define AWS_AFFINITY_METHOD_PTHREAD_ATTR 1
37 : #define AWS_AFFINITY_METHOD_PTHREAD 2
38 :
39 : // Ensure provided affinity method matches one of the supported values
40 : // clang-format off
41 : #if AWS_AFFINITY_METHOD != AWS_AFFINITY_METHOD_NONE \
42 : && AWS_AFFINITY_METHOD != AWS_AFFINITY_METHOD_PTHREAD_ATTR \
43 : && AWS_AFFINITY_METHOD != AWS_AFFINITY_METHOD_PTHREAD
44 : // clang-format on
45 : # error "Invalid thread affinity method"
46 : #endif
47 :
48 : static struct aws_thread_options s_default_options = {
49 : /* this will make sure platform default stack size is used. */
50 : .stack_size = 0,
51 : .cpu_id = -1,
52 : .join_strategy = AWS_TJS_MANUAL,
53 : };
54 :
55 : struct thread_atexit_callback {
56 : aws_thread_atexit_fn *callback;
57 : void *user_data;
58 : struct thread_atexit_callback *next;
59 : };
60 :
61 : struct thread_wrapper {
62 : struct aws_allocator *allocator;
63 : struct aws_linked_list_node node;
64 : void (*func)(void *arg);
65 : void *arg;
66 : struct thread_atexit_callback *atexit;
67 : void (*call_once)(void *);
68 : void *once_arg;
69 :
70 : /*
71 : * The managed thread system does lazy joins on threads once finished via their wrapper. For that to work
72 : * we need something to join against, so we keep a by-value copy of the original thread here. The tricky part
73 : * is how to set the threadid/handle of this copy since the copy must be injected into the thread function before
74 : * the threadid/handle is known. We get around that by just querying it at the top of the wrapper thread function.
75 : */
76 : struct aws_thread thread_copy;
77 : bool membind;
78 : };
79 :
80 : static AWS_THREAD_LOCAL struct thread_wrapper *tl_wrapper = NULL;
81 :
82 : /*
83 : * thread_wrapper is platform-dependent so this function ends up being duplicated in each thread implementation
84 : */
85 0 : void aws_thread_join_and_free_wrapper_list(struct aws_linked_list *wrapper_list) {
86 0 : struct aws_linked_list_node *iter = aws_linked_list_begin(wrapper_list);
87 0 : while (iter != aws_linked_list_end(wrapper_list)) {
88 0 :
89 0 : struct thread_wrapper *join_thread_wrapper = AWS_CONTAINER_OF(iter, struct thread_wrapper, node);
90 0 :
91 0 : /*
92 0 : * Can't do a for-loop since we need to advance to the next wrapper before we free the wrapper
93 0 : */
94 0 : iter = aws_linked_list_next(iter);
95 0 :
96 0 : join_thread_wrapper->thread_copy.detach_state = AWS_THREAD_JOINABLE;
97 0 : aws_thread_join(&join_thread_wrapper->thread_copy);
98 0 : aws_mem_release(join_thread_wrapper->allocator, join_thread_wrapper);
99 0 :
100 0 : aws_thread_decrement_unjoined_count();
101 0 : }
102 0 : }
103 :
104 0 : static void *thread_fn(void *arg) {
105 0 : struct thread_wrapper *wrapper_ptr = arg;
106 0 :
107 0 : /*
108 0 : * Make sure the aws_thread copy has the right thread id stored in it.
109 0 : */
110 0 : wrapper_ptr->thread_copy.thread_id = aws_thread_current_thread_id();
111 0 :
112 0 : struct thread_wrapper wrapper = *wrapper_ptr;
113 0 : struct aws_allocator *allocator = wrapper.allocator;
114 0 : tl_wrapper = &wrapper;
115 0 :
116 0 : if (wrapper.membind && g_set_mempolicy_ptr) {
117 0 : AWS_LOGF_INFO(
118 0 : AWS_LS_COMMON_THREAD,
119 0 : "a cpu affinity was specified when launching this thread and set_mempolicy() is available on this "
120 0 : "system. Setting the memory policy to MPOL_PREFERRED");
121 0 : /* if a user set a cpu id in their thread options, we're going to make sure the numa policy honors that
122 0 : * and makes sure the numa node of the cpu we launched this thread on is where memory gets allocated. However,
123 0 : * we don't want to fail the application if this fails, so make the call, and ignore the result. */
124 0 : long resp = g_set_mempolicy_ptr(AWS_MPOL_PREFERRED_ALIAS, NULL, 0);
125 0 : if (resp) {
126 0 : AWS_LOGF_WARN(AWS_LS_COMMON_THREAD, "call to set_mempolicy() failed with errno %d", errno);
127 0 : }
128 0 : }
129 0 : wrapper.func(wrapper.arg);
130 0 :
131 0 : /*
132 0 : * Managed threads don't free the wrapper yet. The thread management system does it later after the thread
133 0 : * is joined.
134 0 : */
135 0 : bool is_managed_thread = wrapper.thread_copy.detach_state == AWS_THREAD_MANAGED;
136 0 : if (!is_managed_thread) {
137 0 : aws_mem_release(allocator, arg);
138 0 : }
139 0 :
140 0 : struct thread_atexit_callback *exit_callback_data = wrapper.atexit;
141 0 : while (exit_callback_data) {
142 0 : aws_thread_atexit_fn *exit_callback = exit_callback_data->callback;
143 0 : void *exit_callback_user_data = exit_callback_data->user_data;
144 0 : struct thread_atexit_callback *next_exit_callback_data = exit_callback_data->next;
145 0 :
146 0 : aws_mem_release(allocator, exit_callback_data);
147 0 :
148 0 : exit_callback(exit_callback_user_data);
149 0 : exit_callback_data = next_exit_callback_data;
150 0 : }
151 0 : tl_wrapper = NULL;
152 0 :
153 0 : /*
154 0 : * Release this thread to the managed thread system for lazy join.
155 0 : */
156 0 : if (is_managed_thread) {
157 0 : aws_thread_pending_join_add(&wrapper_ptr->node);
158 0 : }
159 0 :
160 0 : return NULL;
161 0 : }
162 :
163 0 : const struct aws_thread_options *aws_default_thread_options(void) {
164 0 : return &s_default_options;
165 0 : }
166 :
167 0 : void aws_thread_clean_up(struct aws_thread *thread) {
168 0 : if (thread->detach_state == AWS_THREAD_JOINABLE) {
169 0 : pthread_detach(thread->thread_id);
170 0 : }
171 0 : }
172 :
173 0 : static void s_call_once(void) {
174 0 : tl_wrapper->call_once(tl_wrapper->once_arg);
175 0 : }
176 :
177 0 : void aws_thread_call_once(aws_thread_once *flag, void (*call_once)(void *), void *user_data) {
178 0 : // If this is a non-aws_thread, then gin up a temp thread wrapper
179 0 : struct thread_wrapper temp_wrapper;
180 0 : if (!tl_wrapper) {
181 0 : tl_wrapper = &temp_wrapper;
182 0 : }
183 0 :
184 0 : tl_wrapper->call_once = call_once;
185 0 : tl_wrapper->once_arg = user_data;
186 0 : pthread_once(flag, s_call_once);
187 0 :
188 0 : if (tl_wrapper == &temp_wrapper) {
189 0 : tl_wrapper = NULL;
190 0 : }
191 0 : }
192 :
193 0 : int aws_thread_init(struct aws_thread *thread, struct aws_allocator *allocator) {
194 0 : *thread = (struct aws_thread){.allocator = allocator, .detach_state = AWS_THREAD_NOT_CREATED};
195 0 :
196 0 : return AWS_OP_SUCCESS;
197 0 : }
198 :
199 : int aws_thread_launch(
200 : struct aws_thread *thread,
201 : void (*func)(void *arg),
202 : void *arg,
203 0 : const struct aws_thread_options *options) {
204 0 :
205 0 : pthread_attr_t attributes;
206 0 : pthread_attr_t *attributes_ptr = NULL;
207 0 : int attr_return = 0;
208 0 : int allocation_failed = 0;
209 0 : bool is_managed_thread = options != NULL && options->join_strategy == AWS_TJS_MANAGED;
210 0 : if (is_managed_thread) {
211 0 : thread->detach_state = AWS_THREAD_MANAGED;
212 0 : }
213 0 :
214 0 : if (options) {
215 0 : attr_return = pthread_attr_init(&attributes);
216 0 :
217 0 : if (attr_return) {
218 0 : goto cleanup;
219 0 : }
220 0 :
221 0 : attributes_ptr = &attributes;
222 0 :
223 0 : if (options->stack_size > PTHREAD_STACK_MIN) {
224 0 : attr_return = pthread_attr_setstacksize(attributes_ptr, options->stack_size);
225 0 :
226 0 : if (attr_return) {
227 0 : goto cleanup;
228 0 : }
229 0 : }
230 0 :
231 0 : /* AFAIK you can't set thread affinity on apple platforms, and it doesn't really matter since all memory
232 0 : * NUMA or not is setup in interleave mode.
233 0 : * Thread afinity is also not supported on Android systems, and honestly, if you're running android on a NUMA
234 0 : * configuration, you've got bigger problems. */
235 0 : #if AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD_ATTR
236 0 : if (options->cpu_id >= 0) {
237 0 : AWS_LOGF_INFO(
238 0 : AWS_LS_COMMON_THREAD,
239 0 : "id=%p: cpu affinity of cpu_id %d was specified, attempting to honor the value.",
240 0 : (void *)thread,
241 0 : options->cpu_id);
242 0 :
243 0 : cpu_set_t cpuset;
244 0 : CPU_ZERO(&cpuset);
245 0 : CPU_SET((uint32_t)options->cpu_id, &cpuset);
246 0 :
247 0 : attr_return = pthread_attr_setaffinity_np(attributes_ptr, sizeof(cpuset), &cpuset);
248 0 :
249 0 : if (attr_return) {
250 0 : AWS_LOGF_ERROR(
251 0 : AWS_LS_COMMON_THREAD,
252 0 : "id=%p: pthread_attr_setaffinity_np() failed with %d.",
253 0 : (void *)thread,
254 0 : errno);
255 0 : goto cleanup;
256 0 : }
257 0 : }
258 0 : #endif /* AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD_ATTR */
259 0 : }
260 0 :
261 0 : struct thread_wrapper *wrapper =
262 0 : (struct thread_wrapper *)aws_mem_calloc(thread->allocator, 1, sizeof(struct thread_wrapper));
263 0 :
264 0 : if (!wrapper) {
265 0 : allocation_failed = 1;
266 0 : goto cleanup;
267 0 : }
268 0 :
269 0 : if (options && options->cpu_id >= 0) {
270 0 : wrapper->membind = true;
271 0 : }
272 0 :
273 0 : wrapper->thread_copy = *thread;
274 0 : wrapper->allocator = thread->allocator;
275 0 : wrapper->func = func;
276 0 : wrapper->arg = arg;
277 0 :
278 0 : /*
279 0 : * Increment the count prior to spawning the thread. Decrement back if the create failed.
280 0 : */
281 0 : if (is_managed_thread) {
282 0 : aws_thread_increment_unjoined_count();
283 0 : }
284 0 :
285 0 : attr_return = pthread_create(&thread->thread_id, attributes_ptr, thread_fn, (void *)wrapper);
286 0 :
287 0 : if (attr_return) {
288 0 : if (is_managed_thread) {
289 0 : aws_thread_decrement_unjoined_count();
290 0 : }
291 0 : goto cleanup;
292 0 : }
293 0 :
294 : #if AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD
295 : /* If we don't have pthread_attr_setaffinity_np, we may
296 : * still be able to set the thread affinity after creation. */
297 : if (options && options->cpu_id >= 0) {
298 : AWS_LOGF_INFO(
299 : AWS_LS_COMMON_THREAD,
300 : "id=%p: cpu affinity of cpu_id %d was specified, attempting to honor the value.",
301 : (void *)thread,
302 : options->cpu_id);
303 :
304 : cpu_set_t cpuset;
305 : CPU_ZERO(&cpuset);
306 : CPU_SET((uint32_t)options->cpu_id, &cpuset);
307 :
308 : attr_return = pthread_setaffinity_np(thread->thread_id, sizeof(cpuset), &cpuset);
309 : if (attr_return) {
310 : AWS_LOGF_ERROR(
311 : AWS_LS_COMMON_THREAD, "id=%p: pthread_setaffinity_np() failed with %d.", (void *)thread, errno);
312 : goto cleanup;
313 : }
314 : }
315 : #endif /* AWS_AFFINITY_METHOD == AWS_AFFINITY_METHOD_PTHREAD */
316 : /*
317 0 : * Managed threads need to stay unjoinable from an external perspective. We'll handle it after thread function
318 0 : * completion.
319 0 : */
320 0 : if (!is_managed_thread) {
321 0 : thread->detach_state = AWS_THREAD_JOINABLE;
322 0 : }
323 0 :
324 0 : cleanup:
325 0 : if (attributes_ptr) {
326 0 : pthread_attr_destroy(attributes_ptr);
327 0 : }
328 0 :
329 0 : if (attr_return == EINVAL) {
330 0 : return aws_raise_error(AWS_ERROR_THREAD_INVALID_SETTINGS);
331 0 : }
332 0 :
333 0 : if (attr_return == EAGAIN) {
334 0 : return aws_raise_error(AWS_ERROR_THREAD_INSUFFICIENT_RESOURCE);
335 0 : }
336 0 :
337 0 : if (attr_return == EPERM) {
338 0 : return aws_raise_error(AWS_ERROR_THREAD_NO_PERMISSIONS);
339 0 : }
340 0 :
341 0 : if (allocation_failed || attr_return == ENOMEM) {
342 0 : return aws_raise_error(AWS_ERROR_OOM);
343 0 : }
344 0 :
345 0 : return AWS_OP_SUCCESS;
346 0 : }
347 :
348 0 : aws_thread_id_t aws_thread_get_id(struct aws_thread *thread) {
349 0 : return thread->thread_id;
350 0 : }
351 :
352 0 : enum aws_thread_detach_state aws_thread_get_detach_state(struct aws_thread *thread) {
353 0 : return thread->detach_state;
354 0 : }
355 :
356 0 : int aws_thread_join(struct aws_thread *thread) {
357 0 : if (thread->detach_state == AWS_THREAD_JOINABLE) {
358 0 : int err_no = pthread_join(thread->thread_id, 0);
359 0 :
360 0 : if (err_no) {
361 0 : if (err_no == EINVAL) {
362 0 : return aws_raise_error(AWS_ERROR_THREAD_NOT_JOINABLE);
363 0 : }
364 0 : if (err_no == ESRCH) {
365 0 : return aws_raise_error(AWS_ERROR_THREAD_NO_SUCH_THREAD_ID);
366 0 : }
367 0 : if (err_no == EDEADLK) {
368 0 : return aws_raise_error(AWS_ERROR_THREAD_DEADLOCK_DETECTED);
369 0 : }
370 0 : }
371 0 :
372 0 : thread->detach_state = AWS_THREAD_JOIN_COMPLETED;
373 0 : }
374 0 :
375 0 : return AWS_OP_SUCCESS;
376 0 : }
377 :
378 0 : aws_thread_id_t aws_thread_current_thread_id(void) {
379 0 : return pthread_self();
380 0 : }
381 :
382 0 : bool aws_thread_thread_id_equal(aws_thread_id_t t1, aws_thread_id_t t2) {
383 0 : return pthread_equal(t1, t2) != 0;
384 0 : }
385 :
386 0 : void aws_thread_current_sleep(uint64_t nanos) {
387 0 : uint64_t nano = 0;
388 0 : time_t seconds = (time_t)aws_timestamp_convert(nanos, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &nano);
389 0 :
390 0 : struct timespec tm = {
391 0 : .tv_sec = seconds,
392 0 : .tv_nsec = (long)nano,
393 0 : };
394 0 : struct timespec output;
395 0 :
396 0 : nanosleep(&tm, &output);
397 0 : }
398 :
399 0 : int aws_thread_current_at_exit(aws_thread_atexit_fn *callback, void *user_data) {
400 0 : if (!tl_wrapper) {
401 0 : return aws_raise_error(AWS_ERROR_THREAD_NOT_JOINABLE);
402 0 : }
403 0 :
404 0 : struct thread_atexit_callback *cb = aws_mem_calloc(tl_wrapper->allocator, 1, sizeof(struct thread_atexit_callback));
405 0 : if (!cb) {
406 0 : return AWS_OP_ERR;
407 0 : }
408 0 : cb->callback = callback;
409 0 : cb->user_data = user_data;
410 0 : cb->next = tl_wrapper->atexit;
411 0 : tl_wrapper->atexit = cb;
412 0 : return AWS_OP_SUCCESS;
413 0 : }
|