0
0
mirror of https://github.com/cjdelisle/cjdns synced 2025-10-06 00:32:50 +02:00

Merge branch 'crashey' of github.com:cjdelisle/cjdns into crashey

This commit is contained in:
Caleb James DeLisle
2024-10-24 13:43:39 +00:00
5 changed files with 61 additions and 106 deletions

View File

@@ -263,18 +263,20 @@ static void onPingResponse(String* data, uint32_t milliseconds, void* vping)
uint32_t version = p->context->incomingVersion;
struct SwitchPinger_Response* resp =
Allocator_calloc(p->pub.pingAlloc, sizeof(struct SwitchPinger_Response), 1);
resp->version = p->context->incomingVersion;
resp->res = err;
resp->label = label;
resp->data = data;
resp->milliseconds = milliseconds;
resp->version = version;
Bits_memcpy(resp->key, p->context->incomingKey, 32);
Bits_memcpy(&resp->snode, &p->context->incomingSnodeAddr, sizeof(struct Address));
resp->kbpsLimit = p->context->incomingSnodeKbps;
resp->rpath = p->context->rpath;
resp->ping = &p->pub;
Bits_memcpy(&resp->lladdr, &p->context->lladdrMsg, sizeof p->context->lladdrMsg);
if (err != SwitchPinger_Result_TIMEOUT) {
resp->version = p->context->incomingVersion;
resp->data = data;
resp->version = version;
Bits_memcpy(resp->key, p->context->incomingKey, 32);
Bits_memcpy(&resp->snode, &p->context->incomingSnodeAddr, sizeof(struct Address));
resp->kbpsLimit = p->context->incomingSnodeKbps;
resp->rpath = p->context->rpath;
Bits_memcpy(&resp->lladdr, &p->context->lladdrMsg, sizeof p->context->lladdrMsg);
}
p->onResponse(resp, p->pub.onResponseContext);
}

View File

@@ -15,8 +15,10 @@
#include "admin/Admin.h"
#include "benc/String.h"
#include "benc/Dict.h"
#include "dht/Address.h"
#include "net/SwitchPinger.h"
#include "net/SwitchPinger_admin.h"
#include "util/Bits.h"
#include "util/Endian.h"
#include "util/AddrTools.h"
#include "crypto/Key.h"
@@ -50,10 +52,12 @@ static void adminPingOnResponse(struct SwitchPinger_Response* resp, void* vping)
uint8_t path[20] = {0};
AddrTools_printPath(path, resp->label);
String* pathStr = String_new(path, pingAlloc);
Dict_putStringC(rd, "rpath", pathStr, pingAlloc);
Dict_putStringC(rd, "responsePath", pathStr, pingAlloc);
}
Dict_putIntC(rd, "version", resp->version, pingAlloc);
if (resp->version) {
Dict_putIntC(rd, "version", resp->version, pingAlloc);
}
Dict_putIntC(rd, "ms", resp->milliseconds, pingAlloc);
Dict_putStringC(rd, "result", SwitchPinger_resultString(resp->res), pingAlloc);
Dict_putStringC(rd, "path", ping->path, pingAlloc);
@@ -65,6 +69,17 @@ static void adminPingOnResponse(struct SwitchPinger_Response* resp, void* vping)
Dict_putStringC(rd, "key", Key_stringify(resp->key, pingAlloc), pingAlloc);
}
if (!Bits_isZero(&resp->snode, sizeof resp->snode)) {
Dict_putStringC(rd, "snode", Address_toString(&resp->snode, pingAlloc), pingAlloc);
}
if (resp->rpath) {
uint8_t path[20] = {0};
AddrTools_printPath(path, resp->rpath);
String* pathStr = String_new(path, pingAlloc);
Dict_putStringC(rd, "rpath", pathStr, pingAlloc);
}
if (!Bits_isZero(&resp->lladdr, sizeof resp->lladdr)) {
struct Sockaddr_storage ss;
if (resp->lladdr.addr.udp4.type == Control_LlAddr_Udp4_TYPE) {
@@ -100,11 +115,13 @@ static void adminPing(Dict* args, void* vcontext, String* txid, struct Allocator
String* data = Dict_getStringC(args, "data");
int64_t* keyPing = Dict_getIntC(args, "keyPing");
int64_t* lladdr = Dict_getIntC(args, "lladdr");
int64_t* snode = Dict_getIntC(args, "snode");
int64_t* rpath = Dict_getIntC(args, "rpath");
uint32_t timeout = (timeoutPtr) ? *timeoutPtr : DEFAULT_TIMEOUT;
uint64_t path;
char* err = NULL;
if (keyPing && *keyPing && lladdr && *lladdr) {
err = "Cannot be both keyping and lladdr";
if ((keyPing && *keyPing != 0) + (lladdr && *lladdr != 0) + (snode && *snode != 0) > 1) {
err = "Can only be one of keyPing, lladdr, OR snode";
} else if (pathStr->len != 19 || AddrTools_parsePath(&path, (uint8_t*) pathStr->bytes)) {
err = "path was not parsable.";
} else {
@@ -121,6 +138,10 @@ static void adminPing(Dict* args, void* vcontext, String* txid, struct Allocator
ping->type = SwitchPinger_Type_KEYPING;
} else if (lladdr && *lladdr) {
ping->type = SwitchPinger_Type_LLADDR;
} else if (snode && *snode) {
ping->type = SwitchPinger_Type_GETSNODE;
} else if (rpath && *rpath) {
ping->type = SwitchPinger_Type_RPATH;
}
ping->onResponseContext = Allocator_clone(ping->pingAlloc, (&(struct Ping) {
.context = context,
@@ -154,5 +175,7 @@ void SwitchPinger_admin_register(struct SwitchPinger* sp,
{ .name = "data", .required = 0, .type = "String" },
{ .name = "keyPing", .required = 0, .type = "Int" },
{ .name = "lladdr", .required = 0, .type = "Int" },
{ .name = "snode", .required = 0, .type = "Int" },
{ .name = "rpath", .required = 0, .type = "Int" },
}), admin);
}

View File

@@ -13,10 +13,7 @@ use anyhow::{Context, Result};
use std::net::SocketAddr;
const SEND_MSGS_LIMIT: usize = 16;
const RECV_MSGS_LIMIT: usize = 16;
const TO_GO_OUT_QUEUE: usize = 64;
const INCOMING_QUEUE: usize = 64;
const BUFFER_CAP: usize = 3496;
const PADDING_AMOUNT: usize = 512;
@@ -48,9 +45,6 @@ struct UDPAddrIfaceInternal {
to_go_out_recv: Mutex<Receiver<(Message,SocketAddr)>>,
to_go_out_send: Sender<(Message,SocketAddr)>,
incoming_recv: Mutex<Receiver<Message>>,
incoming_send: Sender<Message>,
send_worker_states: Vec<AtomicI32>,
recv_worker_states: Vec<AtomicI32>,
}
@@ -96,7 +90,6 @@ impl UDPAddrIfaceInternal {
self.recv_worker_states[n].store(state as i32, std::sync::atomic::Ordering::Relaxed);
}
async fn recv_worker(self: Arc<Self>, n: usize) {
let mut recv_msgs = Vec::with_capacity(RECV_MSGS_LIMIT);
let mut message = None;
loop {
// If we get the lock on the receive channel, drain it and send back until it's empty
@@ -109,50 +102,29 @@ impl UDPAddrIfaceInternal {
msg
};
self.recv_worker_set_state(n, RecvWorkerState::RecvBatch);
tokio::select! {
res = self.udp.recv_from(msg.bytes_mut()) => {
self.recv_worker_set_state(n, RecvWorkerState::RecievedBatch);
log::trace!("recv_worker got a recv_from");
match res {
Ok((byte_count,from)) => {
log::trace!("Ok UDP packet from {from} with {byte_count} bytes");
if byte_count == BUFFER_CAP {
log::warn!("Truncated incoming message from {from}");
}
msg.set_len(byte_count).unwrap();
let addr = Sockaddr::from(&from);
msg.push_bytes(addr.bytes()).unwrap();
match self.incoming_send.send(msg).await {
Ok(()) => {
log::trace!("UDP packet forwarded to receiver");
}
Err(e) => {
log::info!("Lost message from {from} because: {e}");
}
}
}
let res = self.udp.recv_from(msg.bytes_mut()).await;
self.recv_worker_set_state(n, RecvWorkerState::RecievedBatch);
log::trace!("recv_worker got a recv_from");
match res {
Ok((byte_count,from)) => {
log::trace!("Ok UDP packet from {from} with {byte_count} bytes");
if byte_count == BUFFER_CAP {
log::warn!("Truncated incoming message from {from}");
}
msg.set_len(byte_count).unwrap();
let addr = Sockaddr::from(&from);
msg.push_bytes(addr.bytes()).unwrap();
match self.iface.send(msg) {
Ok(()) => {
log::trace!("UDP receiver thread sent packet successfully");
},
Err(e) => {
log::warn!("Error receiving UDP message {e}");
log::debug!("Error processing packet: {e}");
}
}
}
mut recv = self.incoming_recv.lock() => {
self.recv_worker_set_state(n, RecvWorkerState::IfaceSendOne);
message = Some(msg);
log::trace!("{:?} UDP receiver thread waiting", self.udp.local_addr());
recv.recv_many(&mut recv_msgs, RECV_MSGS_LIMIT).await;
log::trace!("UDP receiver thread got recv_many");
self.recv_worker_set_state(n, RecvWorkerState::IfaceSendTwo);
for msg in recv_msgs.drain(..) {
match self.iface.send(msg) {
Ok(()) => {
log::trace!("UDP receiver thread sent packet successfully");
},
Err(e) => {
log::debug!("Error processing packet: {e}");
}
}
}
Err(e) => {
log::warn!("Error receiving UDP message {e}");
}
}
}
@@ -181,8 +153,6 @@ impl UDPAddrIface {
let (mut iface, iface_pvt) = iface::new("UDPAddrIface");
let (tgo, tgo_r) =
tokio::sync::mpsc::channel(TO_GO_OUT_QUEUE);
let (incoming, incoming_r) =
tokio::sync::mpsc::channel(INCOMING_QUEUE);
let workers = num_cpus::get();
let workers = if workers < 2 {
@@ -197,8 +167,6 @@ impl UDPAddrIface {
udp,
to_go_out_recv: Mutex::new(tgo_r),
to_go_out_send: tgo,
incoming_recv: Mutex::new(incoming_r),
incoming_send: incoming,
send_worker_states: (0..workers).map(|_|AtomicI32::new(0)).collect(),
recv_worker_states: (0..workers).map(|_|AtomicI32::new(0)).collect(),
});

View File

@@ -47,10 +47,13 @@ pub fn list_to_c<'a>(alloc: *mut Allocator_t, v: &Vec<Value<'a>>) -> *mut List_t
pub fn string_to_c<'a>(alloc: *mut Allocator_t, v: impl Into<&'a[u8]>) -> *mut String_t {
let v: &'a[u8] = v.into();
let bytes = allocator::adopt(alloc, Vec::from(v));
let mut v = Vec::from(v);
let len = v.len();
v.push(0);
let bytes = allocator::adopt(alloc, v);
let bytes = unsafe { (*bytes)[..].as_mut_ptr() };
allocator::adopt(alloc, String_t{
len: v.len(),
len,
bytes: bytes as _,
})
}

View File

@@ -1,41 +0,0 @@
/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#ifndef GetPeersResponder_H
#define GetPeersResponder_H
#include "memory/Allocator.h"
#include "util/log/Log.h"
#include "subnode/AddrSet.h"
#include "subnode/MsgCore.h"
#include "subnode/BoilerplateResponder.h"
#include "switch/EncodingScheme.h"
#include "util/Linker.h"
Linker_require("subnode/GetPeersResponder.c")
struct GetPeersResponder
{
int unused;
};
struct GetPeersResponder* GetPeersResponder_new(struct Allocator* allocator,
struct Log* log,
struct AddrSet* peers,
struct Address* selfAddr,
struct MsgCore* msgCore,
struct BoilerplateResponder* br,
struct EncodingScheme* myScheme);
#endif