Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cmake/Modules/ValkeySetup.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,30 @@ else ()
set(USE_RDMA 0)
endif ()

# Systemd support (auto-detect by default, matching Make build behaviour)
# Pass -DUSE_SYSTEMD=no to disable, -DUSE_SYSTEMD=yes to require.
if (NOT "${USE_SYSTEMD}" STREQUAL "no")
find_package(PkgConfig QUIET)
if (PKG_CONFIG_FOUND)
pkg_check_modules(LIBSYSTEMD libsystemd)
if (LIBSYSTEMD_FOUND)
add_valkey_server_compiler_options("-DHAVE_LIBSYSTEMD")
list(APPEND SERVER_LIBS "${LIBSYSTEMD_LIBRARIES}")
message(STATUS "Building with systemd support: ${LIBSYSTEMD_LIBRARIES}")
elseif ("${USE_SYSTEMD}" STREQUAL "yes")
message(FATAL_ERROR "USE_SYSTEMD=yes but libsystemd was not found")
else ()
message(STATUS "libsystemd not found, building without systemd support")
endif ()
elseif ("${USE_SYSTEMD}" STREQUAL "yes")
message(FATAL_ERROR "USE_SYSTEMD=yes but pkg-config is not available")
else ()
message(STATUS "pkg-config not found, building without systemd support")
endif ()
else ()
message(STATUS "Systemd support disabled")
endif ()

set(BUILDING_ARM64 0)
set(BUILDING_ARM32 0)

Expand Down
2 changes: 1 addition & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1545,7 +1545,7 @@ void clusterInitLast(void) {
listener->bindaddr_count = server.bindaddr_count;
listener->port = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
listener->ct = connTypeOfCluster();
if (connListen(listener) == C_ERR) {
if (adoptInheritedFdsForListener(listener, CONN_TYPE_CLUSTER_BUS) == 0 && connListen(listener) == C_ERR) {
/* Note: the following log text is matched by the test suite. */
serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", listener->port);
exit(1);
Expand Down
3 changes: 2 additions & 1 deletion src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ struct connListener {
int bindaddr_count;
int port;
ConnectionType *ct;
void *priv; /* used by connection type specified data */
void *priv; /* used by connection type specified data */
int inherited; /* 1 if fds were passed by systemd socket activation */
};

/* The connection module does not deal with listening and accepting sockets,
Expand Down
136 changes: 134 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@
#include <sys/mman.h>
#endif

#ifdef HAVE_LIBSYSTEMD
#include <systemd/sd-daemon.h>
#endif

#if defined(HAVE_SYSCTL_KIPC_SOMAXCONN) || defined(HAVE_SYSCTL_KERN_SOMAXCONN)
#include <sys/sysctl.h>
#endif
Expand Down Expand Up @@ -3190,7 +3194,8 @@ void initListeners(void) {
listener = &server.listeners[j];
if (listener->ct == NULL) continue;

if (connListen(listener) == C_ERR) {
if (adoptInheritedFdsForListener(listener, listener->ct->get_type()) == 0 &&
connListen(listener) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port,
getConnectionTypeName(listener->ct->get_type()));
exit(1);
Expand Down Expand Up @@ -4667,7 +4672,8 @@ void closeListeningSockets(int unlink_unix_socket) {

if (server.cluster_enabled)
for (j = 0; j < server.clistener.count; j++) close(server.clistener.fd[j]);
if (unlink_unix_socket && server.unixsocket) {
if (unlink_unix_socket && server.unixsocket &&
!server.listeners[CONN_TYPE_UNIX].inherited) {
serverLog(LL_NOTICE, "Removing the unix socket file.");
if (unlink(server.unixsocket) != 0)
serverLog(LL_WARNING, "Error removing the unix socket file: %s", strerror(errno));
Expand Down Expand Up @@ -6946,6 +6952,18 @@ connListener *listenerByType(int type) {

/* Close original listener, re-create a new listener from the updated bind address & port */
int changeListener(connListener *listener) {
/* Refuse runtime reconfiguration for listeners inherited from systemd
* socket activation: closing and re-binding here would relinquish the
* fd that systemd holds, and the subsequent bind() can fail or race.
* Adjust the systemd .socket unit instead. */
if (listener->inherited) {
serverLog(LL_WARNING,
"Cannot reconfigure %s listener at runtime: the listening socket was inherited "
"from systemd socket activation. Adjust the systemd .socket unit instead.",
getConnectionTypeName(listener->ct->get_type()));
return C_ERR;
}

/* Close old servers */
connCloseListener(listener);

Expand Down Expand Up @@ -7371,6 +7389,118 @@ int serverCommunicateSystemd(const char *sd_notify_msg) {
#endif
}

/* ============== Systemd socket activation ============== */

#ifdef HAVE_LIBSYSTEMD
struct sdFdEntry {
int fd;
int conn_type;
};

static struct sdFdEntry sd_inherited_fds[CONFIG_BINDADDR_MAX + 1];
static int sd_inherited_fd_count = 0;

/* Return 1 if `inherited_ip` (as returned by anetFdToString) on family
* `inherited_family` is accepted by any entry in server.bindaddr[].
* Valkey wildcards `*` and `::*` match all IPv4 / IPv6 addresses respectively.
* Other entries must match the inherited address string exactly. */
static int sdInheritedAddrMatchesBindaddr(int inherited_family, const char *inherited_ip) {
if (server.bindaddr_count == 0) return 0;
for (int i = 0; i < server.bindaddr_count; i++) {
const char *ba = server.bindaddr[i];
if (!strcmp(ba, "*")) {
if (inherited_family == AF_INET) return 1;
} else if (!strcmp(ba, "::*")) {
if (inherited_family == AF_INET6) return 1;
} else if (!strcmp(ba, inherited_ip)) {
return 1;
}
}
return 0;
}

void inheritSystemdListenFds(void) {
/* Pass 1 so LISTEN_PID/LISTEN_FDS are unset, keeping children unaffected. */
int n = sd_listen_fds(1);
if (n <= 0) return;

for (int i = 0; i < n; i++) {
int fd = SD_LISTEN_FDS_START + i;
int conn_type = -1;

if (sd_is_socket_unix(fd, SOCK_STREAM, 1, NULL, 0) > 0) {
if (server.unixsocket) {
struct sockaddr_un sa;
socklen_t salen = sizeof(sa);
if (getsockname(fd, (struct sockaddr *)&sa, &salen) == 0 &&
strcmp(sa.sun_path, server.unixsocket) == 0)
conn_type = CONN_TYPE_UNIX;
}
} else if (sd_is_socket_inet(fd, AF_UNSPEC, SOCK_STREAM, 1, 0) > 0) {
struct sockaddr_storage sa;
socklen_t salen = sizeof(sa);
int port = 0;
char ip[NET_IP_STR_LEN];
if (getsockname(fd, (struct sockaddr *)&sa, &salen) == 0 &&
anetFdToString(fd, ip, sizeof(ip), &port, 0) == 0 && port > 0) {
int matches_bindaddr = sdInheritedAddrMatchesBindaddr(sa.ss_family, ip);
if (server.cluster_enabled && matches_bindaddr &&
(port == server.cluster_port ||
(server.cluster_port == 0 && port == server.port + CLUSTER_PORT_INCR)))
conn_type = CONN_TYPE_CLUSTER_BUS;
else if (server.tls_port && port == server.tls_port && matches_bindaddr)
conn_type = CONN_TYPE_TLS;
else if (server.port && port == server.port && matches_bindaddr)
conn_type = CONN_TYPE_SOCKET;
}
}

if (conn_type >= 0 && sd_inherited_fd_count < (int)(sizeof(sd_inherited_fds) / sizeof(sd_inherited_fds[0]))) {
sd_inherited_fds[sd_inherited_fd_count].fd = fd;
sd_inherited_fds[sd_inherited_fd_count].conn_type = conn_type;
sd_inherited_fd_count++;
} else {
serverLog(LL_WARNING,
"Systemd socket activation: fd %d doesn't match any configured listener, closing.", fd);
close(fd);
}
}
}

int adoptInheritedFdsForListener(connListener *listener, int sd_conn_type) {
int adopted = 0;
for (int i = 0; i < sd_inherited_fd_count; i++) {
if (sd_inherited_fds[i].conn_type != sd_conn_type) continue;
int fd = sd_inherited_fds[i].fd;
anetNonBlock(NULL, fd);
anetCloexec(fd);
listener->fd[listener->count++] = fd;
adopted++;
}
if (adopted > 0) {
listener->inherited = 1;
if (sd_conn_type == CONN_TYPE_UNIX)
serverLog(LL_NOTICE, "Systemd socket activation: adopted %d fd(s) for unix listener on path %s", adopted,
server.unixsocket ? server.unixsocket : "");
else if (sd_conn_type == CONN_TYPE_CLUSTER_BUS)
serverLog(LL_NOTICE, "Systemd socket activation: adopted %d fd(s) for cluster bus listener on port %d",
adopted, listener->port);
else
serverLog(LL_NOTICE, "Systemd socket activation: adopted %d fd(s) for %s listener on port %d", adopted,
getConnectionTypeName(sd_conn_type), listener->port);
}
return adopted;
}
#else
void inheritSystemdListenFds(void) {
}
int adoptInheritedFdsForListener(connListener *listener, int sd_conn_type) {
UNUSED(listener);
UNUSED(sd_conn_type);
return 0;
}
#endif /* HAVE_LIBSYSTEMD */

/* Attempt to set up upstart supervision. Returns 1 if successful. */
static int serverSupervisedUpstart(void) {
const char *upstart_job = getenv("UPSTART_JOB");
Expand Down Expand Up @@ -7658,6 +7788,8 @@ __attribute__((weak)) int main(int argc, char **argv) {

/* Daemonize if needed */
server.supervised = serverIsSupervised(server.supervised_mode);
/* Must be called after config is loaded and before initListeners(). */
inheritSystemdListenFds();
int background = server.daemonize && !server.supervised;
if (background) {
/* We need to reset server.pid after daemonize(), otherwise the
Expand Down
5 changes: 5 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2836,6 +2836,11 @@ long long serverPopcount(void *s, long count);
int serverSetProcTitle(char *title);
int validateProcTitleTemplate(const char *templ);
int serverCommunicateSystemd(const char *sd_notify_msg);
void inheritSystemdListenFds(void);
/* Sentinel for the cluster bus in adoptInheritedFdsForListener(), since the
* cluster bus has no ConnectionType and isn't in server.listeners[]. */
#define CONN_TYPE_CLUSTER_BUS CONN_TYPE_MAX
int adoptInheritedFdsForListener(connListener *listener, int sd_conn_type);
void serverSetCpuAffinity(const char *cpulist);
void dictVanillaFree(void *val);

Expand Down
Loading