[WS] Implement wslay unbuffered message parsing
Ensure we never read more than we can store during poll. Raise default max packets to 4096 to maintain the same performance for the first 2048 packets.
This commit is contained in:
@ -295,6 +295,7 @@ Error WSLPeer::_do_server_handshake() {
|
||||
resolver.stop();
|
||||
// Response sent, initialize wslay context.
|
||||
wslay_event_context_server_init(&wsl_ctx, &_wsl_callbacks, this);
|
||||
wslay_event_config_set_no_buffering(wsl_ctx, 1);
|
||||
wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size);
|
||||
in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets);
|
||||
packet_buffer.resize(inbound_buffer_size);
|
||||
@ -403,6 +404,7 @@ void WSLPeer::_do_client_handshake() {
|
||||
ERR_FAIL_MSG("Invalid response headers.");
|
||||
}
|
||||
wslay_event_context_client_init(&wsl_ctx, &_wsl_callbacks, this);
|
||||
wslay_event_config_set_no_buffering(wsl_ctx, 1);
|
||||
wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size);
|
||||
in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets);
|
||||
packet_buffer.resize(inbound_buffer_size);
|
||||
@ -568,8 +570,15 @@ ssize_t WSLPeer::_wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data,
|
||||
wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
|
||||
return -1;
|
||||
}
|
||||
// Make sure we don't read more than what our buffer can hold.
|
||||
size_t buffer_limit = MIN(peer->in_buffer.payload_space_left(), peer->in_buffer.packets_space_left() * 2); // The minimum size of a websocket message is 2 bytes.
|
||||
size_t to_read = MIN(len, buffer_limit);
|
||||
if (to_read == 0) {
|
||||
wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK);
|
||||
return -1;
|
||||
}
|
||||
int read = 0;
|
||||
Error err = conn->get_partial_data(data, len, read);
|
||||
Error err = conn->get_partial_data(data, to_read, read);
|
||||
if (err != OK) {
|
||||
print_verbose("Websocket get data error: " + itos(err) + ", read (should be 0!): " + itos(read));
|
||||
wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
|
||||
@ -582,6 +591,37 @@ ssize_t WSLPeer::_wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data,
|
||||
return read;
|
||||
}
|
||||
|
||||
void WSLPeer::_wsl_recv_start_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_start_arg *arg, void *user_data) {
|
||||
WSLPeer *peer = (WSLPeer *)user_data;
|
||||
uint8_t op = arg->opcode;
|
||||
if (op == WSLAY_TEXT_FRAME || op == WSLAY_BINARY_FRAME) {
|
||||
// Get ready to process a data package.
|
||||
PendingMessage &pm = peer->pending_message;
|
||||
pm.opcode = op;
|
||||
pm.payload_size = arg->payload_length;
|
||||
}
|
||||
}
|
||||
|
||||
void WSLPeer::_wsl_frame_recv_chunk_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_chunk_arg *arg, void *user_data) {
|
||||
WSLPeer *peer = (WSLPeer *)user_data;
|
||||
PendingMessage &pm = peer->pending_message;
|
||||
if (pm.opcode != 0) {
|
||||
// Only write the payload.
|
||||
peer->in_buffer.write_packet(arg->data, arg->data_length, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
void WSLPeer::_wsl_frame_recv_end_callback(wslay_event_context_ptr ctx, void *user_data) {
|
||||
WSLPeer *peer = (WSLPeer *)user_data;
|
||||
PendingMessage &pm = peer->pending_message;
|
||||
if (pm.opcode != 0) {
|
||||
// Only write the packet (since it's now completed).
|
||||
uint8_t is_string = pm.opcode == WSLAY_TEXT_FRAME ? 1 : 0;
|
||||
peer->in_buffer.write_packet(nullptr, pm.payload_size, &is_string);
|
||||
pm.clear();
|
||||
}
|
||||
}
|
||||
|
||||
ssize_t WSLPeer::_wsl_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data) {
|
||||
WSLPeer *peer = (WSLPeer *)user_data;
|
||||
Ref<StreamPeer> conn = peer->connection;
|
||||
@ -627,28 +667,19 @@ void WSLPeer::_wsl_msg_recv_callback(wslay_event_context_ptr ctx, const struct w
|
||||
return;
|
||||
}
|
||||
|
||||
if (peer->ready_state == STATE_CLOSING) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (op == WSLAY_TEXT_FRAME || op == WSLAY_BINARY_FRAME) {
|
||||
// Message.
|
||||
uint8_t is_string = arg->opcode == WSLAY_TEXT_FRAME ? 1 : 0;
|
||||
peer->in_buffer.write_packet(arg->msg, arg->msg_length, &is_string);
|
||||
}
|
||||
if (op == WSLAY_PONG) {
|
||||
peer->heartbeat_waiting = false;
|
||||
}
|
||||
// Pong.
|
||||
// Ping, or message (already parsed in chunks).
|
||||
}
|
||||
|
||||
wslay_event_callbacks WSLPeer::_wsl_callbacks = {
|
||||
_wsl_recv_callback,
|
||||
_wsl_send_callback,
|
||||
_wsl_genmask_callback,
|
||||
nullptr, /* on_frame_recv_start_callback */
|
||||
nullptr, /* on_frame_recv_callback */
|
||||
nullptr, /* on_frame_recv_end_callback */
|
||||
_wsl_recv_start_callback,
|
||||
_wsl_frame_recv_chunk_callback,
|
||||
_wsl_frame_recv_end_callback,
|
||||
_wsl_msg_recv_callback
|
||||
};
|
||||
|
||||
@ -836,6 +867,7 @@ void WSLPeer::close(int p_code, String p_reason) {
|
||||
heartbeat_waiting = false;
|
||||
in_buffer.clear();
|
||||
packet_buffer.resize(0);
|
||||
pending_message.clear();
|
||||
}
|
||||
|
||||
IPAddress WSLPeer::get_connected_host() const {
|
||||
|
||||
Reference in New Issue
Block a user