0
0
mirror of https://github.com/cjdelisle/cjdns synced 2025-10-06 00:32:50 +02:00

Move lock into proximity of where the actual blocking epoll/poll/kqueue call is made.

This commit is contained in:
Caleb James DeLisle
2023-02-03 20:57:16 +01:00
parent 571981cb69
commit 01a5b07f85
16 changed files with 87 additions and 45 deletions

View File

@@ -60,6 +60,7 @@
#include "util/events/Pipe.h"
#include "util/events/PipeServer.h"
#include "util/events/Timeout.h"
#include "util/events/libuv/Glock.h"
#include "util/Hex.h"
#include "util/log/FileWriterLog.h"
#include "util/log/IndirectLog.h"
@@ -432,6 +433,7 @@ void Core_init(struct Allocator* alloc,
int Core_main(int argc, char** argv)
{
Glock_init();
struct Except* eh = NULL;
if (argc != 3) {

View File

@@ -49,6 +49,7 @@
#include "util/events/Pipe.h"
#include "util/events/Process.h"
#include "util/events/FakeNetwork.h"
#include "util/events/libuv/Glock.h"
#include "util/Hex.h"
#include "util/log/Log.h"
#include "util/log/FileWriterLog.h"
@@ -597,6 +598,7 @@ static String* getPipePath(Dict* config, struct Allocator* alloc)
int cjdroute2_main(int argc, char** argv);
int cjdroute2_main(int argc, char** argv)
{
Glock_init();
#ifdef Log_KEYS
fprintf(stderr, "Log_LEVEL = KEYS, EXPECT TO SEE PRIVATE KEYS IN YOUR LOGS!\n");
#endif

View File

@@ -158,12 +158,10 @@ static void readCallbackB(struct TAPInterface_pvt* tap)
static void readCallback(uv_iocp_t* readIocp)
{
void *glock = Rffi_glock();
struct TAPInterface_pvt* tap =
Identity_check((struct TAPInterface_pvt*)
(((char*)readIocp) - offsetof(struct TAPInterface_pvt, readIocp)));
readCallbackB(tap);
Rffi_gunlock(glock);
}
static void writeCallbackB(struct TAPInterface_pvt* tap);
@@ -220,12 +218,10 @@ static void writeCallbackB(struct TAPInterface_pvt* tap)
static void writeCallback(uv_iocp_t* writeIocp)
{
void *glock = Rffi_glock();
struct TAPInterface_pvt* tap =
Identity_check((struct TAPInterface_pvt*)
(((char*)writeIocp) - offsetof(struct TAPInterface_pvt, writeIocp)));
writeCallbackB(tap);
Rffi_gunlock(glock);
}
static Iface_DEFUN sendMessage(struct Message* msg, struct Iface* iface)

View File

@@ -20,6 +20,7 @@
#include "uv.h"
#include "internal.h"
#include "util/events/libuv/Glock.h"
#include <assert.h>
#include <stdlib.h>
@@ -127,12 +128,14 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
spec.tv_nsec = (timeout % 1000) * 1000000;
}
Glock_beginBlockingCall();
nfds = kevent(loop->backend_fd,
events,
nevents,
events,
ARRAY_SIZE(events),
timeout == -1 ? NULL : &spec);
Glock_endBlockingCall();
/* Update loop->time unconditionally. It's tempting to skip the update when
* timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the

View File

@@ -20,6 +20,7 @@
#include "uv.h"
#include "internal.h"
#include "util/events/libuv/Glock.h"
#include <stdint.h>
#include <stdio.h>
@@ -189,20 +190,24 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
for (;;) {
if (!no_epoll_wait) {
Glock_beginBlockingCall();
nfds = uv__epoll_wait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout);
Glock_endBlockingCall();
if (nfds == -1 && errno == ENOSYS) {
no_epoll_wait = 1;
continue;
}
} else {
Glock_beginBlockingCall();
nfds = uv__epoll_pwait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout,
NULL);
Glock_endBlockingCall();
}
/* Update loop->time unconditionally. It's tempting to skip the update when

View File

@@ -165,6 +165,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
nfds = 1;
saved_errno = 0;
Glock_beginBlockingCall();
if (port_getn(loop->backend_fd,
events,
ARRAY_SIZE(events),
@@ -178,6 +179,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
else
abort();
}
Glock_endBlockingCall();
/* Update loop->time unconditionally. It's tempting to skip the update when
* timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the

View File

@@ -28,7 +28,7 @@
#include <string.h>
#include <crtdbg.h>
#include "rust/cjdns_sys/Rffi.h"
#include "util/events/libuv/Glock.h"
#include "uv.h"
#include "internal.h"
#include "handle-inl.h"
@@ -181,8 +181,6 @@ static void* CJDNS_LOOP_LOCK = NULL;
uv_loop_t* uv_loop_new(void) {
uv_loop_t* loop;
CJDNS_LOOP_LOCK = Rffi_glock();
/* Initialize libuv itself first */
uv__once_init();
@@ -234,11 +232,13 @@ static void uv_poll(uv_loop_t* loop, int block) {
timeout = 0;
}
Glock_beginBlockingCall();
GetQueuedCompletionStatus(loop->iocp,
&bytes,
&key,
&overlapped,
timeout);
Glock_endBlockingCall();
if (overlapped) {
/* Package was dequeued */
@@ -270,12 +270,14 @@ static void uv_poll_ex(uv_loop_t* loop, int block) {
timeout = 0;
}
Glock_beginBlockingCall();
success = pGetQueuedCompletionStatusEx(loop->iocp,
overlappeds,
ARRAY_SIZE(overlappeds),
&count,
timeout,
FALSE);
Glock_endBlockingCall();
if (success) {
for (i = 0; i < count; i++) {
@@ -328,9 +330,6 @@ int uv_run(uv_loop_t *loop, uv_run_mode mode) {
uv_idle_invoke(loop);
uv_prepare_invoke(loop);
// sync with the Rust async Runtime.
Rffi_gunlock(CJDNS_LOOP_LOCK);
(*poll)(loop, loop->idle_handles == NULL &&
loop->pending_reqs_tail == NULL &&
loop->endgame_handles == NULL &&
@@ -339,9 +338,6 @@ int uv_run(uv_loop_t *loop, uv_run_mode mode) {
!QUEUE_EMPTY(&loop->active_reqs)) &&
!(mode & UV_RUN_NOWAIT));
// sync with the Rust async Runtime.
CJDNS_LOOP_LOCK = Rffi_glock();
uv_check_invoke(loop);
uv_process_endgames(loop);

View File

@@ -17,6 +17,7 @@
#include "util/CString.h"
#include "util/Js.h"
#include "util/events/libuv/Glock.h"
#include <stdio.h>
@@ -33,6 +34,7 @@ Js({
int RootTest_main(int argc, char** argv);
int main(int argc, char** argv)
{
Glock_init();
int runIt = 0;
int j = 0;
for (int i = 0; i < argc; i++) {

View File

@@ -22,6 +22,7 @@
#include "wire/Message.h"
#include "test/FuzzTest.h"
#include "util/Js.h"
#include "util/events/libuv/Glock.h"
#include <stdio.h>
#include <unistd.h>
@@ -265,12 +266,16 @@ static int main2(int argc, char** argv, struct Allocator* alloc, struct Random*
(int)((now - startTime)/1000000),
(int)((now - startTime)/1000)%1000);
}
// We need to drop the lock before we exit, otherwise the rust thread can't complete.
Glock_beginBlockingCall();
return 0;
}
int testcjdroute_main(int argc, char** argv);
int testcjdroute_main(int argc, char** argv)
{
Glock_init();
struct Allocator* alloc = Allocator_new(1<<24);
Allocator_free(alloc);
alloc = Allocator_new(1<<24);

View File

@@ -33,21 +33,17 @@ struct Event_pvt
static void handleEvent(uv_poll_t* handle, int status, int events)
{
void *glock = Rffi_glock();
struct Event_pvt* event =
Identity_check((struct Event_pvt*) (((char*)handle) - offsetof(struct Event_pvt, handler)));
if ((status == 0) && (events & UV_READABLE)) {
event->callback(event->callbackContext);
}
Rffi_gunlock(glock);
}
static void freeEvent2(uv_handle_t* handle)
{
void *glock = Rffi_glock();
Allocator_onFreeComplete((struct Allocator_OnFreeJob*)handle->data);
Rffi_gunlock(glock);
}
static int freeEvent(struct Allocator_OnFreeJob* job)

View File

@@ -71,14 +71,12 @@ static void calibrateTime(struct EventBase_pvt* base)
static void doNothing(uv_async_t* handle, int status)
{
void *glock = Rffi_glock();
struct EventBase_pvt* base = Identity_containerOf(handle, struct EventBase_pvt, uvAwakener);
if (base->running == 2) {
uv_stop(base->loop);
}
// title says it all
// printf("[%d] Do nothing\n", getpid());
Rffi_gunlock(glock);
}
static void blockTimer(uv_timer_t* timer, int status)
@@ -154,12 +152,10 @@ void EventBase_wakeup(void* eventBase)
static void countCallback(uv_handle_t* event, void* vEventCount)
{
void *glock = Rffi_glock();
int* eventCount = (int*) vEventCount;
if (!uv_is_closing(event)) {
*eventCount = *eventCount + 1;
}
Rffi_gunlock(glock);
}
int EventBase_eventCount(struct EventBase* eventBase)

37
util/events/libuv/Glock.c Normal file
View File

@@ -0,0 +1,37 @@
/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "util/events/libuv/Glock.h"
#include "rust/cjdns_sys/Rffi.h"
#include "util/Assert.h"
static void* glock = NULL;
void Glock_init() {
if (glock == NULL) {
glock = Rffi_glock();
}
}
void Glock_beginBlockingCall() {
Assert_true(glock != NULL);
Rffi_gunlock(glock);
glock = NULL;
}
void Glock_endBlockingCall() {
Assert_true(glock == NULL);
glock = Rffi_glock();
}

24
util/events/libuv/Glock.h Normal file
View File

@@ -0,0 +1,24 @@
/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include "util/Linker.h"
Linker_require("util/events/libuv/Glock.c")
void Glock_init(void);
void Glock_beginBlockingCall(void);
void Glock_endBlockingCall(void);

View File

@@ -78,7 +78,6 @@ struct Pipe_WriteRequest_pvt {
static void sendMessageCallback(uv_write_t* uvReq, int error)
{
void *glock = Rffi_glock();
struct Pipe_WriteRequest_pvt* req = Identity_check((struct Pipe_WriteRequest_pvt*) uvReq);
if (error) {
Log_info(req->pipe->log, "Failed to write to pipe [%s] [%s]",
@@ -87,7 +86,6 @@ static void sendMessageCallback(uv_write_t* uvReq, int error)
req->pipe->queueLen -= Message_getLength(req->msg);
Assert_ifParanoid(req->pipe->queueLen >= 0);
Allocator_free(req->alloc);
Rffi_gunlock(glock);
}
static void sendMessage2(struct Pipe_WriteRequest_pvt* req)
@@ -175,13 +173,11 @@ static Iface_DEFUN sendMessage(struct Message* m, struct Iface* iface)
/** Asynchronous allocator freeing. */
static void onClose(uv_handle_t* handle)
{
void *glock = Rffi_glock();
struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*)handle->data);
handle->data = NULL;
Log_debug(pipe->log, "Pipe closed");
Assert_true(pipe->closeHandlesOnFree && !pipe->peer.data);
Allocator_onFreeComplete((struct Allocator_OnFreeJob*) pipe->closeHandlesOnFree);
Rffi_gunlock(glock);
}
#if Pipe_PADDING_AMOUNT < 8
@@ -191,7 +187,6 @@ static void onClose(uv_handle_t* handle)
static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
{
void *glock = Rffi_glock();
struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) stream->data);
// Grab out the msg which was placed there by allocate()
@@ -227,7 +222,6 @@ static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
if (pipe->blockFreeInsideCallback) {
Allocator_onFreeComplete((struct Allocator_OnFreeJob*) pipe->blockFreeInsideCallback);
}
Rffi_gunlock(glock);
}
static void incoming2(uv_pipe_t* stream, ssize_t nread, const uv_buf_t* buf, uv_handle_type _)
@@ -237,7 +231,6 @@ static void incoming2(uv_pipe_t* stream, ssize_t nread, const uv_buf_t* buf, uv_
static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
{
void *glock = Rffi_glock();
struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) handle->data);
size = Pipe_BUFFER_CAP;
@@ -248,7 +241,6 @@ static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
buf->base = msg->msgbytes;
buf->len = size;
Rffi_gunlock(glock);
}
static int startPipe(struct Pipe_pvt* pipe)
@@ -262,7 +254,6 @@ static int startPipe(struct Pipe_pvt* pipe)
static void connected(uv_connect_t* req, int status)
{
void *glock = Rffi_glock();
uv_stream_t* link = req->handle;
struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) link->data);
Log_debug(pipe->log, "Pipe [%s] established connection", pipe->pub.fullName);
@@ -292,7 +283,6 @@ static void connected(uv_connect_t* req, int status)
pipe->bufferedRequest = NULL;
}
}
Rffi_gunlock(glock);
}
static int blockFreeInsideCallback(struct Allocator_OnFreeJob* job)

View File

@@ -96,13 +96,11 @@ static Iface_DEFUN incomingFromClient(struct Message* msg, struct Iface* iface)
/** Asynchronous allocator freeing. */
static void onClose(uv_handle_t* handle)
{
void *glock = Rffi_glock();
struct PipeServer_pvt* psp = Identity_check((struct PipeServer_pvt*)handle->data);
handle->data = NULL;
if (psp->closeHandlesOnFree && !psp->server.data) {
Allocator_onFreeComplete((struct Allocator_OnFreeJob*) psp->closeHandlesOnFree);
}
Rffi_gunlock(glock);
}
static struct Pipe* getPipe(struct PipeServer_pvt* psp, struct Allocator* alloc)
@@ -141,20 +139,17 @@ static void pipeOnClose(struct Pipe* p, int status)
static void listenCallback(uv_stream_t* server, int status)
{
void *glock = Rffi_glock();
uv_pipe_t* pServer = (uv_pipe_t*) server;
struct PipeServer_pvt* psp = Identity_containerOf(pServer, struct PipeServer_pvt, server);
if (status == -1) {
Log_info(psp->log, "failed to accept pipe connection [%s] [%s]",
psp->pub.fullName, uv_strerror(status));
Rffi_gunlock(glock);
return;
}
struct Allocator* pipeAlloc = Allocator_child(psp->alloc);
struct Pipe* p = getPipe(psp, pipeAlloc);
if (p == NULL) {
Allocator_free(pipeAlloc);
Rffi_gunlock(glock);
return;
}
struct Client* cli = Allocator_calloc(pipeAlloc, sizeof(struct Client), 1);
@@ -180,7 +175,6 @@ static void listenCallback(uv_stream_t* server, int status)
psp->pub.onConnection(&psp->pub, &cli->addr);
}
cli->pipe->onClose = pipeOnClose;
Rffi_gunlock(glock);
}
static int closeHandlesOnFree(struct Allocator_OnFreeJob* job)

View File

@@ -65,7 +65,6 @@ static struct UDPAddrIface_pvt* ifaceForHandle(uv_udp_t* handle)
static void sendComplete(uv_udp_send_t* uvReq, int error)
{
void *glock = Rffi_glock();
struct UDPAddrIface_WriteRequest_pvt* req =
Identity_check((struct UDPAddrIface_WriteRequest_pvt*) uvReq);
if (error) {
@@ -76,7 +75,6 @@ static void sendComplete(uv_udp_send_t* uvReq, int error)
req->udp->queueLen -= Message_getLength(req->msg);
Assert_true(req->udp->queueLen >= 0);
Allocator_free(req->alloc);
Rffi_gunlock(glock);
}
@@ -144,7 +142,6 @@ static void incoming(uv_udp_t* handle,
const struct sockaddr* addr,
unsigned flags)
{
void *glock = Rffi_glock();
struct UDPAddrIface_pvt* context = ifaceForHandle(handle);
context->inCallback = 1;
@@ -182,12 +179,10 @@ static void incoming(uv_udp_t* handle,
if (context->blockFreeInsideCallback) {
Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->blockFreeInsideCallback);
}
Rffi_gunlock(glock);
}
static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
{
void *glock = Rffi_glock();
struct UDPAddrIface_pvt* context = ifaceForHandle((uv_udp_t*)handle);
size = UDPAddrIface_BUFFER_CAP;
@@ -203,16 +198,13 @@ static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
buf->base = msg->msgbytes;
buf->len = size;
Rffi_gunlock(glock);
}
static void onClosed(uv_handle_t* wasClosed)
{
void *glock = Rffi_glock();
struct UDPAddrIface_pvt* context =
Identity_check((struct UDPAddrIface_pvt*) wasClosed->data);
Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->closeHandleOnFree);
Rffi_gunlock(glock);
}
static int closeHandleOnFree(struct Allocator_OnFreeJob* job)