Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions modules/rtpproxy/notification_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,24 @@ static int rtpproxy_io_callback(int fd, void *fs, int was_timeout)

if (len < 0) {
LM_ERR("reading from socket failed: %s\n",strerror(errno));
reactor_proc_del_fd(fd, -1, IO_FD_CLOSING);
if (notify) {
list_del(&notify->list);
if (notify->remaining)
pkg_free(notify->remaining);
pkg_free(notify);
}
shutdown(fd, SHUT_RDWR);
close(fd);
return -1;
}
if (len == 0) {
LM_DBG("closing rtpproxy notify socket\n");
reactor_del_reader(fd, -1, IO_FD_CLOSING);
reactor_proc_del_fd(fd, -1, IO_FD_CLOSING);
if (notify) {
list_del(&notify->list);
if (notify->remaining)
pkg_free(notify->remaining);
pkg_free(notify);
}
shutdown(fd, SHUT_RDWR);
Expand All @@ -251,8 +262,19 @@ static int rtpproxy_io_callback(int fd, void *fs, int was_timeout)
p = sp + 1;
left -= (sp - start) + 1;

if (notification_handler(&command) < 0)
if (notification_handler(&command) < 0) {
LM_ERR("notification_handler failed\n");
reactor_proc_del_fd(fd, -1, IO_FD_CLOSING);
if (notify) {
list_del(&notify->list);
if (notify->remaining)
pkg_free(notify->remaining);
pkg_free(notify);
}
shutdown(fd, SHUT_RDWR);
close(fd);
return -1;
}

LM_DBG("Left to process: %d\n[%.*s]\n", left, left, p);

Expand All @@ -266,6 +288,18 @@ static int rtpproxy_io_callback(int fd, void *fs, int was_timeout)
} else {
LM_WARN("dropping remaining data [%.*s]\n", (int)(end - start), start);
}
} else {
/* No remaining data - all notifications processed, close connection */
LM_DBG("All notifications processed, closing connection\n");
reactor_proc_del_fd(fd, -1, IO_FD_CLOSING);
if (notify) {
list_del(&notify->list);
if (notify->remaining)
pkg_free(notify->remaining);
pkg_free(notify);
}
shutdown(fd, SHUT_RDWR);
close(fd);
}
return 0;
}
Expand Down
13 changes: 13 additions & 0 deletions reactor_proc.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "cfg_reload.h"
#include "reactor.h"
#include "reactor_proc.h"
#include "mem/mem.h"


int reactor_proc_init(char *name)
Expand Down Expand Up @@ -59,12 +60,24 @@ int reactor_proc_add_fd(int fd, reactor_proc_cb_f func, void *param)

if (reactor_add_reader( fd, F_GEN_PROC, RCT_PRIO_PROC, cb)<0){
LM_CRIT("failed to add fd to reactor <%s>\n", reactor_name());
pkg_free(cb);
return -1;
}

return 0;
}

int reactor_proc_del_fd(int fd, int idx, int io_flags)
{
struct fd_map *e;

e = get_fd_map(&_worker_io, fd);
if (e && e->type == F_GEN_PROC && e->data) {
pkg_free(e->data);
e->data = NULL;
}
return reactor_del_reader(fd, idx, io_flags);
}

inline static int handle_io(struct fd_map* fm, int idx,int event_type)
{
Expand Down
5 changes: 5 additions & 0 deletions reactor_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ int reactor_proc_init(char *name);

int reactor_proc_add_fd(int fd, reactor_proc_cb_f func, void *param);

/* Remove fd from reactor and free the callback (reactor_proc_cb) allocated by
* reactor_proc_add_fd. Use this instead of reactor_del_reader() for FDs
* that were added with reactor_proc_add_fd() to avoid a PKG memory leak. */
int reactor_proc_del_fd(int fd, int idx, int io_flags);

int reactor_proc_loop(void);

#endif