-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathecho_server.c
More file actions
143 lines (126 loc) · 3.28 KB
/
echo_server.c
File metadata and controls
143 lines (126 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include "coevt.h"
#define CHAN_SIZE 1024
#define WORKER_NUM 128
typedef struct fd_arg {
int fd;
} fd_arg;
typedef struct channel { // TODO: implement mechanism like golang channels to replace this
int first;
int last;
int cap;
int is_empty;
int eles[CHAN_SIZE];
} channel;
static channel *chan;
static void init_channal()
{
chan = (channel *)malloc(sizeof(channel));
memset(chan, -1, sizeof(channel));
chan->first = 0;
chan->last = 0;
chan->is_empty = 1;
chan->cap = CHAN_SIZE;
}
static int put(int ele)
{
if (chan->last == chan->first && chan->is_empty == 0) {
printf("WARNING: channel is full, will discard a request\n");
return -1;
}
chan->is_empty = 0;
chan->eles[chan->last] = ele;
chan->last = (chan->last + 1) % chan->cap;
return 0;
}
static int take()
{
int ret;
if (chan->is_empty) {
return -1;
}
ret = chan->eles[chan->first];
chan->first = (chan->first + 1) % chan->cap;
if (chan->first == chan->last) {
chan->is_empty = 1;
}
return ret;
}
void process_io(void *arg)
{
while (1) {
int cli_fd;
char rd_buf[1024];
char wt_buf[1024 + 128];
int bytes;
cli_fd = take();
if (cli_fd == -1) {
ce_yield();
continue;
}
while (1) {
bytes = ce_read(cli_fd, rd_buf, sizeof(rd_buf));
if (bytes <= 0) {
printf("ERROR: Failed to read data, will close socket %d\n", cli_fd);
ce_close(cli_fd);
break;
}
rd_buf[bytes] = '\0';
sprintf(wt_buf, "echo from server(coroutine id: %d):\n%s\n",
ce_cur_task(), rd_buf);
ce_write(cli_fd, wt_buf, strlen(wt_buf) + 1);
}
}
}
void process_request(void *arg)
{
fd_arg *p_arg = (fd_arg *)arg;
fd_arg cli_fd_t = { -1 };
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
memset(&addr, 0, sizeof(addr));
while (1) {
cli_fd_t.fd = accept(p_arg->fd, (struct sockaddr *)&addr, &len);
if (cli_fd_t.fd != -1) {
printf("INFO: accept connection, return fd %d\n", cli_fd_t.fd);
put(cli_fd_t.fd);
}
ce_yield();
}
}
int main()
{
int sock_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
fd_arg fd_t = { sock_fd };
struct sockaddr_in addr;
int w_id;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(3000);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(sock_fd, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
printf("ERROR: Failed to bind address for socket\n");
if (errno == EADDRINUSE) {
printf("ERROR: The port is already in use\n");
}
close(sock_fd);
return -1;
}
if (listen(sock_fd, 1024) != 0) {
printf("ERROR: Failed to listen to port\n");
close(sock_fd);
return -1;
}
init_channal();
for (w_id = 0; w_id < WORKER_NUM; w_id++) {
ce_task(process_io, NULL);
}
ce_task(process_request, &fd_t);
ce_run();
return 0;
}