diff --git a/modules/janus/janus_common.c b/modules/janus/janus_common.c index f0c45ed9d5..13432a562e 100644 --- a/modules/janus/janus_common.c +++ b/modules/janus/janus_common.c @@ -112,6 +112,10 @@ int janus_raise_event(janus_connection *conn, cJSON *request) } full_json = cJSON_Print(request); + if (!full_json) { + LM_ERR("cJSON_Print failed\n"); + goto err_free_params; + } cJSON_Minify(full_json); full_json_s.s = full_json; full_json_s.len = strlen(full_json); @@ -191,10 +195,15 @@ int handle_janus_json_request(janus_connection *conn, cJSON *request) } full_json = cJSON_Print(request); + if (!full_json) { + LM_ERR("cJSON_Print failed\n"); + return 1; + } cJSON_Minify(full_json); reply->text.s = shm_strdup(full_json); if (reply->text.s == NULL) { + pkg_free(full_json); /* we're out of mem, let the requestor timeout, don't disconnect janus */ return 1; } diff --git a/modules/janus/janus_mod.c b/modules/janus/janus_mod.c index c0e1cd8307..cc856fa7ac 100644 --- a/modules/janus/janus_mod.c +++ b/modules/janus/janus_mod.c @@ -208,12 +208,14 @@ static int w_janus_send_request(struct sip_msg *msg, str *janus_id,str *request, if ((conn = get_janus_connection_by_id(janus_id)) == NULL) { LM_ERR("Unknown JANUS ID %.*s\n",janus_id->len,janus_id->s); + cJSON_Delete(j_request); return -1; } LM_DBG("Found our conn, prep to send out %.*s !! \n",request->len,request->s); reply_id = janus_ipc_send_request(conn,j_request); + cJSON_Delete(j_request); /* tree was serialized to shm; free pkg copy */ if (reply_id == 0) { LM_ERR("Failed to queue request %.*s towards %.*s\n", request->len,request->s, diff --git a/modules/janus/janus_proc.c b/modules/janus/janus_proc.c index 8f30bf7e62..4a689b2043 100644 --- a/modules/janus/janus_proc.c +++ b/modules/janus/janus_proc.c @@ -205,6 +205,13 @@ void janus_reconnects(void) close(sock->fd); sock->fd = -1; + /* Clean up any pending fragment state from previous connection */ + if (sock->con_req.frag_buf) { + pkg_free(sock->con_req.frag_buf); + sock->con_req.frag_buf = NULL; + sock->con_req.frag_len = 0; + sock->con_req.frag_size = 0; + } sock->state = S_CONN_OK; if (janus_reconnect(sock) < 0) { @@ -282,14 +289,23 @@ uint64_t janus_ipc_send_request(janus_connection *sock, cJSON *janus_cmd) lock_stop_write(sock->lists_lk); full_cmd.s = cJSON_Print(janus_cmd); + if (!full_cmd.s) { + shm_free(cmd); + LM_ERR("cJSON_Print failed (pkg OOM)\n"); + return 0; + } full_cmd.len = strlen(full_cmd.s); if (shm_nt_str_dup(&cmd->janus_cmd, &full_cmd) != 0) { + pkg_free(full_cmd.s); shm_free(cmd); LM_ERR("oom\n"); return 0; } + /* cJSON_Print() allocates from pkg via module hooks; free after shm copy */ + pkg_free(full_cmd.s); + janus_transaction_id = cmd->janus_transaction_id; if (ipc_send_job(*janus_mgr_process_no, ipc_hdl_run_janus, cmd) != 0) { diff --git a/modules/janus/ws_common.h b/modules/janus/ws_common.h index aafbe8c764..a6db601980 100644 --- a/modules/janus/ws_common.h +++ b/modules/janus/ws_common.h @@ -304,12 +304,6 @@ static enum ws_close_code inline janus_ws_parse(struct janus_ws_req *req) goto update_parsed; } - if (!WS_IS_FIN(req)) { - LM_ERR("We do not support fragmemntation yet. Dropping...\n"); - req->tcp.error = TCP_READ_ERROR; - return WS_ERR_POLICY; - } - /* check if it is an operation that we support */ req->op = WS_OPCODE(req); switch (req->op) { @@ -318,7 +312,15 @@ static enum ws_close_code inline janus_ws_parse(struct janus_ws_req *req) case WS_OP_CLOSE: case WS_OP_PING: case WS_OP_PONG: - /* continue to read whole packet */ + /* control frames must not be fragmented (RFC 6455 5.5) */ + if (!WS_IS_FIN(req) && + (req->op == WS_OP_CLOSE || req->op == WS_OP_PING || + req->op == WS_OP_PONG)) { + LM_ERR("Fragmented control frame (opcode %d)\n", req->op); + return WS_ERR_PROTO; + } + break; + case WS_OP_CONT: /* continuation frame for fragmented messages */ break; default: LM_ERR("Unsupported WebSocket opcode: %d\n", req->op); @@ -402,14 +404,184 @@ static enum ws_close_code inline janus_ws_parse(struct janus_ws_req *req) (_req)->is_masked = 0; \ (_req)->complete=0; \ (_req)->body=NULL; \ + /* NOTE: frag_* fields intentionally NOT reset here. + * Fragment state must persist across frame boundaries -- + * this macro is called between each frame in a sequence. */ \ } while(0) +static inline void janus_ws_frag_cleanup(struct janus_ws_req *req) +{ + if (req->frag_buf) { + pkg_free(req->frag_buf); + req->frag_buf = NULL; + req->frag_len = 0; + req->frag_size = 0; + } +} + +/* + * Handle WebSocket fragment reassembly per RFC 6455 Section 5.4. + * + * Called after janus_ws_parse() for each complete frame. Accumulates + * fragments into req->frag_buf and delivers the reassembled message + * when the final frame (FIN=1) arrives. + * + * Control frames (PING/PONG/CLOSE) pass through mid-fragment per Section 5.4: + * "Control frames MAY be injected in the middle of a fragmented message." + * + * Returns: + * 0 -- complete message ready in req->buf / req->buf_len + * 1 -- fragment accumulated, parser reset for next frame, read more + * -1 -- error (fragment state cleaned up) + */ +static int janus_ws_handle_frag(struct janus_ws_req *req) +{ + int is_fin = WS_IS_FIN(req); + unsigned int payload_len = req->tcp.content_len; + long size; + + switch (req->op) { + case WS_OP_TEXT: + case WS_OP_BIN: + if (is_fin && !req->frag_buf) { + /* Unfragmented message -- normal path */ + return 0; + } + + if (!is_fin) { + /* First fragment of a new fragmented message */ + if (req->frag_buf) { + LM_WARN("new fragmented message while previous incomplete, " + "dropping old fragments\n"); + pkg_free(req->frag_buf); + } + if (payload_len > WS_MAX_FRAG_SIZE) { + LM_ERR("first fragment too large: %u\n", payload_len); + req->frag_buf = NULL; + req->frag_len = 0; + return -1; + } + if (!req->tcp.body) { + LM_ERR("first fragment has NULL body\n"); + req->frag_buf = NULL; + req->frag_len = 0; + return -1; + } + /* Allocate with 2x headroom to reduce reallocs */ + { + unsigned int alloc_size = (payload_len + 1) * 2; + if (alloc_size > WS_MAX_FRAG_SIZE + 1) + alloc_size = WS_MAX_FRAG_SIZE + 1; + req->frag_buf = pkg_malloc(alloc_size); + if (!req->frag_buf) { + LM_ERR("oom for fragment buffer (%u bytes)\n", alloc_size); + req->frag_len = 0; + return -1; + } + memcpy(req->frag_buf, req->tcp.body, payload_len); + req->frag_len = payload_len; + req->frag_size = alloc_size; + req->frag_op = req->op; + } + + LM_DBG("started fragment accumulation: %u bytes, opcode %d\n", + payload_len, req->op); + goto accum_reset; + } + + /* FIN=1 but frag_buf exists -- new unfragmented msg replaces stale frags */ + LM_WARN("unfragmented text/bin frame with pending fragments, " + "dropping fragments\n"); + janus_ws_frag_cleanup(req); + return 0; + + case WS_OP_CONT: + if (!req->frag_buf) { + LM_ERR("continuation frame received without initial fragment\n"); + return -1; + } + + /* Check reassembled size limit */ + if (req->frag_len + payload_len > WS_MAX_FRAG_SIZE) { + LM_ERR("reassembled message too large: %u + %u > %u\n", + req->frag_len, payload_len, WS_MAX_FRAG_SIZE); + janus_ws_frag_cleanup(req); + return -1; + } + + if (!req->tcp.body && payload_len > 0) { + LM_ERR("continuation frame has NULL body with len %u\n", + payload_len); + janus_ws_frag_cleanup(req); + return -1; + } + + /* Grow frag_buf if needed -- doubling strategy to reduce realloc frequency */ + if (req->frag_len + payload_len + 1 > req->frag_size) { + /* Doubling strategy to reduce realloc frequency */ + unsigned int needed = req->frag_len + payload_len + 1; + unsigned int new_size = req->frag_size * 2; + if (new_size < needed) + new_size = needed; + if (new_size > WS_MAX_FRAG_SIZE + 1) + new_size = WS_MAX_FRAG_SIZE + 1; + char *new_buf = pkg_realloc(req->frag_buf, new_size); + if (!new_buf) { + LM_ERR("oom growing fragment buffer to %u\n", new_size); + janus_ws_frag_cleanup(req); + return -1; + } + req->frag_buf = new_buf; + req->frag_size = new_size; + } + + if (payload_len > 0) + memcpy(req->frag_buf + req->frag_len, req->tcp.body, payload_len); + req->frag_len += payload_len; + + if (!is_fin) { + LM_DBG("accumulated continuation fragment: +%u bytes (total %u)\n", + payload_len, req->frag_len); + goto accum_reset; + } + + /* Final fragment -- deliver reassembled message */ + req->frag_buf[req->frag_len] = '\0'; + req->buf = req->frag_buf; + req->buf_len = req->frag_len; + req->op = req->frag_op; + + LM_DBG("fragment reassembly complete: %u bytes, original opcode %d\n", + req->frag_len, req->frag_op); + return 0; + + default: + /* Control frames (CLOSE/PING/PONG) pass through unchanged + * even mid-fragment (RFC 6455 Section 5.4) */ + return 0; + } + +accum_reset: + /* Reset parser state for the next frame, preserving frag_* fields */ + size = req->tcp.pos - req->tcp.parsed; + if (size < 0) { + LM_BUG("negative leftover size %ld in frag accumulation\n", size); + janus_ws_frag_cleanup(req); + return -1; + } + if (size) + memmove(req->tcp.buf, req->tcp.parsed, size); + + init_janus_ws_req(req, size); + return 1; +} + static int janus_connection_read_data(janus_connection *sock, struct janus_ws_req *req, int _max_msg_chunks) { int ret=-1; long size=0; enum ws_close_code ret_code = WS_ERR_NONE; - unsigned char bk; + unsigned char bk = 0; if (req->tcp.complete) { /* sanity mask checks */ @@ -422,6 +594,20 @@ static int janus_connection_read_data(janus_connection *sock, struct janus_ws_re goto error; } + /* Handle fragment accumulation before processing */ + { + int frag_ret = janus_ws_handle_frag(req); + if (frag_ret == 1) { + /* Fragment accumulated, parser reset, read more */ + sock->msg_attempts = 0; + return 1; + } else if (frag_ret < 0) { + ret_code = WS_ERR_POLICY; + goto error; + } + /* frag_ret == 0: complete message ready */ + } + switch (req->op) { case WS_OP_CLOSE: if (req->tcp.content_len) { @@ -452,7 +638,7 @@ static int janus_connection_read_data(janus_connection *sock, struct janus_ws_re /* release the connextion */ sock->state = S_CONN_EOF; - /* we are trying to populate the handler ID, close if not expected */ + janus_ws_frag_cleanup(req); return -1; case WS_OP_PING: @@ -466,19 +652,24 @@ static int janus_connection_read_data(janus_connection *sock, struct janus_ws_re case WS_OP_TEXT: case WS_OP_BIN: - LM_DBG("read complete [%.*s] \n",(int)(req->tcp.parsed-req->tcp.body),req->tcp.body); + if (req->frag_buf) { + /* Reassembled fragmented message -- buf/buf_len set by frag handler */ + LM_DBG("read reassembled [%.*s]\n", req->buf_len, req->buf); + } else { + /* Normal unfragmented message */ + LM_DBG("read complete [%.*s] \n", + (int)(req->tcp.parsed-req->tcp.body), req->tcp.body); - bk = *req->tcp.parsed; - *req->tcp.parsed = 0; + bk = *req->tcp.parsed; + *req->tcp.parsed = 0; + + req->buf = req->tcp.body; + req->buf_len = req->tcp.parsed-req->tcp.body; + } - req->buf = req->tcp.body; - req->buf_len = req->tcp.parsed-req->tcp.body; - janus_brief_parse_msg((struct janus_req *)req); if (req->complete) { - *req->tcp.parsed=0; - /* prepare for next request */ size=req->tcp.pos-req->tcp.parsed; @@ -489,15 +680,27 @@ static int janus_connection_read_data(janus_connection *sock, struct janus_ws_re if (handle_janus_json_request(sock, req->body) <0) { LM_ERR("Failed to process janus request \n"); cJSON_Delete(req->body); + janus_ws_frag_cleanup(req); return -1; } cJSON_Delete(req->body); - *req->tcp.parsed = bk; + if (req->frag_buf) { + janus_ws_frag_cleanup(req); + } else { + *req->tcp.parsed = bk; + } /* we have received our data */ ret = 0; + } else if (req->frag_buf) { + /* Parse failed on reassembled message -- all data was present, + * so retrying won't help. Clean up and report error. */ + LM_ERR("failed to parse reassembled fragmented message\n"); + janus_ws_frag_cleanup(req); + ret_code = WS_ERR_BADDATA; + goto error; } else { /* we need to read some more */ ret = 1; @@ -529,6 +732,7 @@ static int janus_connection_read_data(janus_connection *sock, struct janus_ws_re /* connection will be released */ return size; error: + janus_ws_frag_cleanup(req); WS_CODE(sock) = ret_code; if (WS_CODE(sock) != WS_ERR_NONE) { janus_ws_send_close(sock); @@ -554,6 +758,18 @@ static int janus_connection_handler_id(janus_connection *sock, struct janus_ws_r goto error; } + /* Handle fragment accumulation before processing */ + { + int frag_ret = janus_ws_handle_frag(req); + if (frag_ret == 1) { + sock->msg_attempts = 0; + return 1; + } else if (frag_ret < 0) { + ret_code = WS_ERR_POLICY; + goto error; + } + } + size=req->tcp.pos-req->tcp.parsed; switch (req->op) { @@ -586,6 +802,7 @@ static int janus_connection_handler_id(janus_connection *sock, struct janus_ws_r sock->state = S_CONN_EOF; /* we are trying to populate the handler ID, close if not expected */ + janus_ws_frag_cleanup(req); return -1; case WS_OP_PING: @@ -599,10 +816,16 @@ static int janus_connection_handler_id(janus_connection *sock, struct janus_ws_r case WS_OP_TEXT: case WS_OP_BIN: - LM_DBG("read complete [%.*s] \n",(int)(req->tcp.parsed-req->tcp.body),req->tcp.body); + if (req->frag_buf) { + /* Reassembled fragmented message */ + LM_DBG("read reassembled [%.*s]\n", req->buf_len, req->buf); + } else { + LM_DBG("read complete [%.*s] \n", + (int)(req->tcp.parsed-req->tcp.body), req->tcp.body); - req->buf = req->tcp.body; - req->buf_len = req->tcp.parsed-req->tcp.body; + req->buf = req->tcp.body; + req->buf_len = req->tcp.parsed-req->tcp.body; + } janus_brief_parse_msg((struct janus_req *)req); @@ -619,12 +842,20 @@ static int janus_connection_handler_id(janus_connection *sock, struct janus_ws_r if (populate_janus_handler_id(sock, req->body) <0) { LM_ERR("Failed to populate handler id \n"); cJSON_Delete(req->body); + janus_ws_frag_cleanup(req); return -1; } cJSON_Delete(req->body); + janus_ws_frag_cleanup(req); /* we have populated the janus handler id */ ret = 0; + } else if (req->frag_buf) { + /* Parse failed on reassembled message -- all data was present */ + LM_ERR("failed to parse reassembled fragmented message\n"); + janus_ws_frag_cleanup(req); + ret_code = WS_ERR_BADDATA; + goto error; } else { ret = 1; } @@ -655,6 +886,7 @@ static int janus_connection_handler_id(janus_connection *sock, struct janus_ws_r /* connection will be released */ return size; error: + janus_ws_frag_cleanup(req); WS_CODE(sock) = ret_code; if (WS_CODE(sock) != WS_ERR_NONE) { janus_ws_send_close(sock); diff --git a/modules/janus/ws_common_defs.h b/modules/janus/ws_common_defs.h index bc7d16d730..c195367ce4 100644 --- a/modules/janus/ws_common_defs.h +++ b/modules/janus/ws_common_defs.h @@ -31,6 +31,11 @@ #include "janus_common.h" #include "../../lib/cJSON.h" +/* Maximum reassembled message size. Janus responses are typically + * a few KB; 256KB provides ample headroom while bounding memory + * usage per connection during reassembly. */ +#define WS_MAX_FRAG_SIZE (256 * 1024) + /* wrapper around tcp request to add ws info */ /* keep this in sync with the janus_req, this gets cast to that */ struct janus_ws_req { @@ -48,6 +53,13 @@ struct janus_ws_req { unsigned int op; unsigned int mask; unsigned int is_masked; + + /* Fragment reassembly state -- allocated in JANUS Manager process, + * persists across init_janus_ws_req() frame resets */ + char *frag_buf; /* accumulated fragment data (pkg_malloc) */ + unsigned int frag_len; /* current accumulated length */ + unsigned int frag_size; /* allocated size of frag_buf */ + unsigned int frag_op; /* original opcode of first fragment */ };