• 天津大学 2024 计算机网络 TCP 课程实践
  • Lab Member:海棠未雨,梨花先雪
  • 最终成绩 93 分捏 😋

文件结构说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
tju_tcp ----------------- 项目根目录 
├─ build ---------------- 存放所有编译的中间文件
│ ├─ kernel.o
│ ├─ tju_packet.o
│ └─ tju_tcp.o
├─ inc ------------------ 存放所有头文件
│ ├─ global.h ---------- 定义一些全局都会用到的变量和结构
│ ├─ kernel.h ---------- 模拟一部分linux内核行为 比如如何发送数据到下一层 根据五元组查找socket等
│ ├─ tju_packet.h ------ 定义TCP所用到的数据包格式 提供各种数据包的操作(创建 获得字段等)
│ └─ tju_tcp.h --------- 需要实现的TCP的各种结构和功能的定义
├─ src ------------------ 存放所有源代码文件
│ ├─ client.c ---------- 客户端主函数实现
│ ├─ server.c ---------- 服务端主函数实现
│ ├─ kernel.c ---------- 虚拟内核实现
│ ├─ tju_packet.c ------ 可靠层包封装实现
│ └─ tju_tcp.c --------- 可靠层实现
├─ Makefile ------------- 生成脚本
├─ README.md ------------ 文件说明
└─ bin
├─ client
└─ server

系统环境信息

1
2
3
4
5
6
7
8
9
10
11
12
13
neofetch
--------------
OS: Ubuntu 20.04.2 LTS x86_64
Host: Oracle Corporation VirtualBox
Kernel: 5.4.0-80-generic
Uptime: 3 mins
Packages: 676 (dpkg), 4 (snap)
Shell: bash 5.0.17
Resolution: 800x600
Terminal: /dev/pts/0
CPU: 11th Gen Intel i7-11800H (2) @ 2.304GHz
GPU: VirtualBox Graphics Adapter
Memory: 294MiB / 981MiB

具体实验记录

连接管理

三次握手建立连接

1. tju_connect( )
tju_connect( )函数是三次握手的开端。在该函数中,客户端的sock绑定好本地的ip和port后向服务器端发送SYN包,同时把当前的状态变化为SYN_SENT。成功建立连接后将sock放入ESTABLISHED的hash表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int tju_connect(tju_tcp_t* sock, tju_sock_addr target_addr){
sock->established_remote_addr = target_addr;

tju_sock_addr local_addr;
local_addr.ip = inet_network(CLIENT_IP);
local_addr.port = 5678; // 连接方进行connect连接的时候 内核中是随机分配一个可用的端口
sock->established_local_addr = local_addr;

// 这里也不能直接建立连接 需要经过三次握手
// 实际在linux中 connect调用后 会进入一个while循环
// 循环跳出的条件是socket的状态变为ESTABLISHED 表面看上去就是 正在连接中 阻塞
// 而状态的改变在别的地方进行 在我们这就是tju_handle_packet
uint32_t seq = 0; // 序列号
uint32_t ack = 0; // 确认号
uint8_t flag=SYN_FLAG_MASK; //flag标记为:连接状态进行到哪一步
char *SYN = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port,seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, 1, 0, NULL, 0);
sendToLayer3(SYN, DEFAULT_HEADER_LEN); //客户端向服务器端发送同步报文--第一次握手
change_sock_state(sock, SYN_SENT);
// printf("client发送SYN\n");

Timeout_retransmission(sock, ESTABLISHED, SYN, DEFAULT_HEADER_LEN); //阻塞等待,直到sock被建立
int hashval = cal_hash(sock->established_local_addr.ip, sock->established_local_addr.port,sock->established_remote_addr.ip, sock->established_remote_addr.port);
established_socks[hashval] = sock;
return 0;
}

2. Timeout_retransmission( )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void Timeout_retransmission(tju_tcp_t* sock, int exp_state, char* pkt, int pktlen) { //超时重传发包函数
//printf("bg1\n");
long timeout = 100000L;
struct timeval start_time, end_time;
gettimeofday(&start_time, NULL);
while (sock->state != exp_state){
gettimeofday(&end_time, NULL);
long Time = 1000000L * (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_usec - start_time.tv_usec);
if (Time >= timeout) { //超时重传
gettimeofday(&start_time, NULL);
sendToLayer3(pkt, pktlen);
}
}
//printf("ed1\n");
}

3. change_sock_state( )

1
2
3
4
5
void change_sock_state(tju_tcp_t* sock, int state) {  //添加:sock状态改变
while (pthread_mutex_lock(&(sock->state_lock)) != 0) ; //改变sock状态前先加锁
sock->state = state;
pthread_mutex_unlock(&(sock->state_lock)); //解锁
}

4. tju_accept()
tju_accept( )函数中添加了一个阻塞,使得只有当全连接hash表中存在这一个sock,服务器端才算连接完成,才能把新的sock加入到ESTABLISHED的hash表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
tju_tcp_t *tju_accept(tju_tcp_t *listen_sock){
while (listen_sock->received_len < DEFAULT_HEADER_LEN);
char *header = malloc(DEFAULT_HEADER_LEN);
memcpy(header, listen_sock->received_buf, DEFAULT_HEADER_LEN);
memset(listen_sock->received_buf, 0, DEFAULT_HEADER_LEN);
listen_sock->received_buf = NULL;
listen_sock->received_len = listen_sock->received_len - DEFAULT_HEADER_LEN;

tju_tcp_t* new_conn = (tju_tcp_t*)malloc(sizeof(tju_tcp_t));
pthread_mutex_init(&(new_conn->state_lock), NULL);
memcpy(new_conn, listen_sock, sizeof(tju_tcp_t));

tju_sock_addr local_addr, remote_addr;
/*
这里涉及到TCP连接的建立
正常来说应该是收到客户端发来的SYN报文
从中拿到对端的IP和PORT
换句话说 下面的处理流程其实不应该放在这里 应该在tju_handle_packet中
*/
remote_addr.ip = inet_network(CLIENT_IP); //具体的IP地址
remote_addr.port = 5678; //端口

local_addr.ip = listen_sock->bind_addr.ip; //具体的IP地址
local_addr.port = listen_sock->bind_addr.port; //端口

new_conn->established_local_addr = local_addr;
new_conn->established_remote_addr = remote_addr;

while (listen_sock->state != ESTABLISHED) ;
change_sock_state(new_conn, ESTABLISHED);

// 将新的conn放到内核建立连接的socket哈希表中
int hashval = cal_hash(local_addr.ip, local_addr.port, remote_addr.ip, remote_addr.port);
established_socks[hashval] = new_conn;
return new_conn;
}

5. tju_handle_packet( )
tju_handle_packet( )函数中需要解决对各种报文的解析。onTCPPocket( )函数如果从established_hash或listen_hash中找到了对应的socket,就会调用 tju_handle_packet( )函数对收到的数据包进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
int tju_handle_packet(tju_tcp_t* sock, char* pkt){
if (sock->state == LISTEN && get_flags(pkt) == SYN_FLAG_MASK){ //第二次握手
while (pthread_mutex_lock(&(sock->recv_lock)) != 0);
if (sock->received_buf == NULL) sock->received_buf = malloc(DEFAULT_HEADER_LEN);
else sock->received_buf = realloc(sock->received_buf, sock->received_len + DEFAULT_HEADER_LEN);
memcpy(sock->received_buf + sock->received_len, pkt, DEFAULT_HEADER_LEN);
sock->received_len += DEFAULT_HEADER_LEN;
pthread_mutex_unlock(&(sock->recv_lock));

tju_sock_addr remote;
remote.ip = inet_network(CLIENT_IP);
remote.port = get_src(pkt);

sock->established_local_addr = sock->bind_addr;
sock->established_remote_addr = remote;

int hashval = cal_hash(sock->established_local_addr.ip, sock->established_local_addr.port, remote.ip, remote.port);
half_connection[hashval] = sock; //收到SYN报文后放进LISTEN的半连接队列中
established_socks[hashval] = sock; //还会将该新建的socket放到ehash中

uint32_t seq = 0;
uint32_t ack = get_seq(pkt) + 1; //确认号是SYN报文序列号+1
uint8_t flag = SYN_FLAG_MASK | ACK_FLAG_MASK; //状态记为SYN+ACK,这里采用位运算|
char *SYNACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, 1, 0, NULL, 0);
sendToLayer3(SYNACK, DEFAULT_HEADER_LEN);
change_sock_state(sock, SYN_RECV);
}
else if (sock->state == SYN_SENT && get_flags(pkt) == (ACK_FLAG_MASK | SYN_FLAG_MASK)){ //第三次握手
//printf("client收到SYCACK, 准备发送ACK\n");
uint32_t seq = get_ack(pkt);
uint32_t ack = get_seq(pkt) + 1;
uint8_t flag = ACK_FLAG_MASK;
sock->window.wnd_send->base = seq;
sock->window.wnd_send->nextseq = seq;
char *ACK = create_packet_buf(sock->established_local_addr.port, get_src(pkt), seq,ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, 1, 0, NULL, 0);
sendToLayer3(ACK, DEFAULT_HEADER_LEN);
change_sock_state(sock, ESTABLISHED);
}
else if (sock->state == SYN_SENT && get_flags(pkt) == SYN_FLAG_MASK){ //第三次握手(奇怪)
// //printf("client收到SYCACK, 准备发送ACK\n");
uint32_t seq = get_ack(pkt);
uint32_t ack = get_seq(pkt) + 1;
uint8_t flag = ACK_FLAG_MASK;
sock->window.wnd_send->base = seq;
sock->window.wnd_send->nextseq = seq;
char *ACK = create_packet_buf(sock->established_local_addr.port, get_src(pkt), seq,ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, 0, 0, NULL, 0);
sendToLayer3(ACK, DEFAULT_HEADER_LEN);
change_sock_state(sock, ESTABLISHED);
}
else if(sock->state == ESTABLISHED && get_flags(pkt) == (SYN_FLAG_MASK | ACK_FLAG_MASK)){ //三次握手第三个包的重传
uint32_t seq = get_ack(pkt);
uint32_t ack = get_seq(pkt) + 1;
uint8_t flag = ACK_FLAG_MASK;
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, 1, 0, NULL, 0);
sendToLayer3(ACK, DEFAULT_HEADER_LEN);
}
else if (sock->state == SYN_RECV && get_flags(pkt) == ACK_FLAG_MASK){
// //printf("server收到ACK\n");
sock->window.wnd_recv->expect_seq = get_seq(pkt) + get_plen(pkt) - DEFAULT_HEADER_LEN;
int hash = cal_hash(sock->established_local_addr.ip, sock->established_local_addr.port,sock->established_remote_addr.ip, sock->established_remote_addr.port);
com_connection[hash] = sock;
half_connection[hash] = NULL;
change_sock_state(sock, ESTABLISHED);
if (get_plen(pkt) > DEFAULT_HEADER_LEN) { //ACK包携带信息
pkt2buffer(sock, pkt);
}
}

6. pkt2buffer( )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void pkt2buffer(tju_tcp_t* sock, char* pkt) {
//printf("bg6\n");
uint32_t data_len = get_plen(pkt) - DEFAULT_HEADER_LEN;

while (pthread_mutex_lock(&(sock->recv_lock)) != 0); // 加锁
if (sock->received_buf == NULL){
sock->received_buf = malloc(data_len);
}
else{
sock->received_buf = realloc(sock->received_buf, sock->received_len + data_len);
}
memcpy(sock->received_buf + sock->received_len, pkt + DEFAULT_HEADER_LEN, data_len);
sock->received_len += data_len;
pthread_mutex_unlock(&(sock->recv_lock)); // 解锁
//printf("ed6\n");
}

四次挥手关闭连接

1. tju_close ( )
四次挥手有两种情况,分别为先后关闭和同时关闭,两种方式首先都需要调用tju_close( )函数,因此,在tju_close( )函数中,需要构造一个 FIN+ACK 报文,seq和ack都是当前报文窗口的下一个数值,并把自身状态变为 FIN-WAIT1。

1
2
3
4
5
6
7
8
9
10
11
12
13
int tju_close (tju_tcp_t* sock){
uint16_t flag = FIN_FLAG_MASK | ACK_FLAG_MASK;
uint32_t seq = sock->window.wnd_send->nextseq;
uint32_t ack = sock->window.wnd_send->nextseq;
char* FINACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port,seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, 0, 0, NULL, 0);
sendToLayer3(FINACK, DEFAULT_HEADER_LEN);
change_sock_state(sock, FIN_WAIT_1);
sock->window.wnd_send->nextseq++;

while (sock->state != CLOSED) ;
//printf("连接完全关闭\n");
return 0;
}

2. tju_handle_packet( )
接下来分为两种情况,但处理过程均在tju_handle_packet( )中完成,tju_handle_packet( )作为对收到的包进行响应的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
int tju_handle_packet(tju_tcp_t* sock, char* pkt){
``````
``````
``````
else if (sock->state == ESTABLISHED && get_flags(pkt) == (FIN_FLAG_MASK | ACK_FLAG_MASK)){ // server收到第一次挥手,并进行第二次挥手和第三次挥手
uint32_t seq = get_ack(pkt);
uint32_t ack = get_seq(pkt) + 1;
uint16_t flag = ACK_FLAG_MASK;
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, 0, 0, NULL, 0);
sendToLayer3(ACK, DEFAULT_HEADER_LEN);
//printf("发送第二次挥手报文\n");
change_sock_state(sock, CLOSE_WAIT);

sleep(1);

flag = ACK_FLAG_MASK | FIN_FLAG_MASK;
seq = sock->window.wnd_send->nextseq;
ack = get_seq(pkt) + 1;
char *FINACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, 0, 0, NULL, 0);
sendToLayer3(FINACK, DEFAULT_HEADER_LEN);
//printf("发送第三次挥手报文\n");
change_sock_state(sock, LAST_ACK);
Timeout_retransmission(sock, CLOSED, FINACK, DEFAULT_HEADER_LEN);
}
else if (sock->state == ESTABLISHED && get_flags(pkt) == FIN_FLAG_MASK){ // server收到第一次挥手,并进行第二次挥手和第三次挥手
uint32_t seq = get_ack(pkt);
uint32_t ack = get_seq(pkt) + 1;
uint16_t flag = ACK_FLAG_MASK;
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, 0, 0, NULL, 0);
sendToLayer3(ACK, DEFAULT_HEADER_LEN);
change_sock_state(sock, CLOSE_WAIT);
//printf("发送第二次挥手报文\n");

sleep(1);

flag = ACK_FLAG_MASK | FIN_FLAG_MASK;
char *FINACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, 0, 0, NULL, 0);
sendToLayer3(FINACK, DEFAULT_HEADER_LEN);
//printf("发送第三次挥手报文\n");
change_sock_state(sock, LAST_ACK);
Timeout_retransmission(sock, CLOSED, FINACK, DEFAULT_HEADER_LEN);
}
else if (sock->state == FIN_WAIT_1 && get_flags(pkt) == ACK_FLAG_MASK){ // client收到第二次挥手
change_sock_state(sock, FIN_WAIT_2);
//printf("收到第二次挥手报文\n");
}
else if (sock->state == FIN_WAIT_2 && (get_flags(pkt) == (ACK_FLAG_MASK | FIN_FLAG_MASK))){ // client收到第三次挥手进行第四次挥手并关闭连接
uint32_t seq = get_ack(pkt);
uint32_t ack = get_seq(pkt) + 1;
uint32_t flag = ACK_FLAG_MASK;
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN,flag, 0, 0, NULL, 0);
sendToLayer3(ACK, DEFAULT_HEADER_LEN);
//printf("收到第三次挥手报文, 并发出第四次挥手\n");
change_sock_state(sock, TIME_WAIT);
int hash = cal_hash(sock->established_local_addr.ip, sock->established_local_addr.port, sock->established_remote_addr.ip, sock->established_remote_addr.port);
//这里要等待2MSL
change_sock_state(sock, CLOSED);
established_socks[hash] = NULL;
//printf("关闭连接\n");
}
else if (sock->state == LAST_ACK && (get_flags(pkt) == ACK_FLAG_MASK)){ // server收到第四次挥手并关闭连接
int hash = cal_hash(sock->established_local_addr.ip, sock->established_local_addr.port, sock->established_remote_addr.ip, sock->established_remote_addr.port);
established_socks[hash] = NULL;
half_connection[hash] = NULL;
com_connection[hash] = NULL;
change_sock_state(sock, CLOSED);
//printf("关闭连接\n");
}
else if (sock->state == FIN_WAIT_1 && (get_flags(pkt) == (FIN_FLAG_MASK | ACK_FLAG_MASK))){ // 同时关闭收到第一次挥手
change_sock_state(sock, CLOSING);
uint32_t seq = get_ack(pkt) + 1;
uint32_t ack = get_seq(pkt) + 1;
uint16_t flag = ACK_FLAG_MASK;
char* ACK = create_packet_buf(sock->established_local_addr.port,sock->established_remote_addr.port,seq,ack,DEFAULT_HEADER_LEN,DEFAULT_HEADER_LEN,flag,0,0,NULL,0);
sendToLayer3(ACK,DEFAULT_HEADER_LEN);
Timeout_retransmission(sock, CLOSED, ACK, DEFAULT_HEADER_LEN);
// //printf("同时关闭发出ACK\n");
}
else if(sock->state == FIN_WAIT_1 && get_flags(pkt) == FIN_FLAG_MASK){ // 同时关闭收到第一次挥手
change_sock_state(sock, CLOSING);
uint32_t seq = get_ack(pkt) + 1;
uint32_t ack = get_seq(pkt) + 1;
uint16_t flag = ACK_FLAG_MASK;
char* ACK = create_packet_buf(sock->established_local_addr.port,sock->established_remote_addr.port,seq,ack,DEFAULT_HEADER_LEN,DEFAULT_HEADER_LEN,flag,0,0,NULL,0);
sendToLayer3(ACK, DEFAULT_HEADER_LEN);
Timeout_retransmission(sock, CLOSED, ACK, DEFAULT_HEADER_LEN);
}
else if (sock->state == CLOSING && get_flags(pkt) == ACK_FLAG_MASK){ // 同时关闭时收到第三次挥手
change_sock_state(sock, TIME_WAIT);
//等待2MSL
int hash = cal_hash(sock->established_local_addr.ip, sock->established_local_addr.port, sock->established_remote_addr.ip, sock->established_remote_addr.port);
established_socks[hash] = NULL;
half_connection[hash] = NULL;
com_connection[hash] = NULL;
change_sock_state(sock, CLOSED);
//printf("同时关闭彻底关闭\n");
}
}

可靠数据传输

发送缓冲区管理

1. global.h 中宏定义 TCP 发送窗口大小。

1
#define TCP_SENDWN_SIZE 1000 

2. tju_packet.h 中为发送缓冲区创建1000个报文长度的循环队列,方便数据包超时重传。

1
2
3
4
5
6
7
// TCP 报文的结构定义
typedef struct {
tju_header_t header;
struct timeval sent_time;
char* data;
} tju_packet_t;
tju_packet_t* pktlist[TCP_SENDWN_SIZE];

3. tju_send( )
上层调用 tju_send( ) 时,我们在 tju_send( )函数内创建发送线程并把发送数据存入缓冲区中。
 这里我们先了解一下创建线程 pthread_create( )函数。

int pthread_create(pthread_t* restrict tidp,const pthread_attr_t* restrict_attr,void* (start_rtn)(void),void *restrict arg);

  • tidp:事先创建好的pthread_t类型的参数。成功时tidp指向的内存单元被设置为新创建线程的线程ID。
  • attr:用于定制各种不同的线程属性。APUE的12.3节讨论了线程属性。通常直接设为NULL。
  • start_rtn:新创建线程从此函数开始运行。无参数是arg设为NULL即可。
  • arg:start_rtn函数的参数。无参数时设为NULL即可。有参数时输入参数的地址。当多于一个参数时应当使用结构体传入。
  • 返回值为 0(表示线程成功创建)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int send_thread_flag = 0;

int tju_send(tju_tcp_t* sock, const void *buffer, int len){
// 这里当然不能直接简单地调用sendToLayer
if(!send_thread_flag){
send_thread_flag = 1;
pthread_t sendthreadid;
int ret = pthread_create(&sendthreadid, NULL, send_pkt, (void *)sock);
}

while (pthread_mutex_lock(&(sock->send_lock)) != 0) ;
memcpy(sock->sending_buf + sock->index, (char*)buffer, len);
sock->sending_len += len;
sock->index += len;
pthread_mutex_unlock(&(sock->send_lock));

return 0;
}

4. send_pkt( )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void *send_pkt(tju_tcp_t* sock){
//printf("bg4\n");
//struct timeval start_time, now_time;
//gettimeofday(&start_time, NULL);
//long deltatime = 0; //时间差
while (1){ //不断检查和发送数据包
if (sock->window.wnd_send->nextseq < sock->index){ //检查发送窗口:检查当前发送序列号是否小于索引,确保有数据需要发送。
//gettimeofday(&now_time, NULL);
//deltatime = now_time.tv_sec - start_time.tv_sec;

while (pthread_mutex_lock(&(sock->send_lock)) != 0);
int unlen = sock->sending_len - (sock->window.wnd_send->nextseq - sock->window.wnd_send->base); //计算当前未发送的数据长度。
int len = min(unlen, MAX_DLEN); //确定本次发送的长度,取未发送长度和最大数据长度中的最小值。
pthread_mutex_unlock(&(sock->send_lock));
int leftlen = sock->window.wnd_send->rwnd - (sock->window.wnd_send->nextseq - sock->window.wnd_send->base);
len =len>leftlen?leftlen:len;

if (sock->window.wnd_send->nextseq - sock->window.wnd_send->base + len <= sock->window.wnd_send->rwnd) { //检查窗口大小:检查当前已发送的长度加上本次发送长度是否在接收窗口范围内。
uint32_t seq = sock->window.wnd_send->nextseq;
uint32_t ack = seq + len; //不理解
uint8_t flag = ACK_FLAG_MASK;
uint16_t pkt_len = len + DEFAULT_HEADER_LEN;
char *data = (char *)malloc(MAX_DLEN);
memcpy(data, sock->sending_buf + sock->window.wnd_send->nextseq, len);
tju_packet_t *datapacket = create_packet(sock->established_local_addr.port, sock->established_remote_addr.port,seq, ack, DEFAULT_HEADER_LEN, pkt_len, flag, 0, 0, data, len);
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port,seq, ack, DEFAULT_HEADER_LEN, pkt_len, flag, 0, 0, data, len);

sendToLayer3(ACK, pkt_len);

sock->window.wnd_send->nextseq += len;
pktlist[sock->packetr] = datapacket;
gettimeofday(&pktlist[sock->packetr]->sent_time, NULL);
sock->packetr = (sock->packetr + 1) % TCP_SENDWN_SIZE;
if (len == (int)(sock->window.wnd_send->nextseq - sock->window.wnd_send->base)){
gettimeofday(&(sock->window.wnd_send->send_time), NULL);

while(pthread_mutex_lock(&(sock->window.wnd_send->ack_cnt_lock)) != 0);
sock->window.wnd_send->ack_cnt = 0;
pthread_mutex_unlock(&(sock->window.wnd_send->ack_cnt_lock));
}
}
}
}
printf("ed4\n");
}

失序报文数组的管理

1. tju_handle_packet( )
tju_handle_packet( )中添加建立连接后收到数据包的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int tju_handle_packet(tju_tcp_t* sock, char* pkt){
`````` 建立连接 ``````
````````````````````
````````````````````
else if(sock->state == ESTABLISHED && get_flags(pkt) == ACK_FLAG_MASK){ //收到数据包
char hostname[8];
gethostname(hostname, 8);
if (strcmp(hostname, "server") == 0){
serverrdt(sock, pkt);
}
else{
clientrdt(sock, pkt);
}
}
`````` 关闭连接 ``````
````````````````````
````````````````````
}

2. tju_tcp_t 结构体中创建空间用来存储失序报文并记录失序报文的个数。

1
2
3
4
5
// TJU_TCP 结构体 保存TJU_TCP用到的各种数据
typedef struct {
char unorder[100][MAX_LEN]; //失序报文数组
int unolen; //失序报文个数
} tju_tcp_t;

3. serverrdt( )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void serverrdt(tju_tcp_t* sock, char* pkt){ // 服务器端处理数据包
if (get_seq(pkt) < (sock->window.wnd_recv->expect_seq)){ //函数发送一个 ACK 响应已确认接收
uint32_t seq = sock->window.wnd_send->nextseq;
uint32_t ack = sock->window.wnd_recv->expect_seq;
uint8_t flag = ACK_FLAG_MASK;
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, TCP_RECVWN_SIZE-(sock->unolen)*MAX_DLEN, 0, NULL, 0);
sendToLayer3(ACK, DEFAULT_HEADER_LEN);
}
else if(get_seq(pkt) > (sock->window.wnd_recv->expect_seq)){ //数据包乱序,函数将包暂存,并发送 ACK,同时更新接收窗口
uint32_t seq = sock->window.wnd_send->nextseq;
uint32_t ack = sock->window.wnd_recv->expect_seq;
uint8_t flag = ACK_FLAG_MASK;
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, TCP_RECVWN_SIZE-(sock->unolen)*MAX_DLEN, 0, NULL, 0);
sendToLayer3(ACK, DEFAULT_HEADER_LEN);

if((sock->unolen) > MAX_PKT_IN_WINDOW) return;
memcpy(sock->unorder[sock->unolen], pkt, get_plen(pkt));
sock->unolen++;
}
else{ //刚刚好
sock->window.wnd_send->nextseq=get_ack(pkt);
sock->window.wnd_recv->expect_seq+=get_plen(pkt)-DEFAULT_HEADER_LEN;
pkt2buffer(sock,pkt);
if(sock->unolen == 0){
uint32_t seq = sock->window.wnd_send->nextseq;
uint32_t ack = sock->window.wnd_recv->expect_seq;
uint8_t flag = ACK_FLAG_MASK;
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag,TCP_RECVWN_SIZE-(sock->unolen)*MAX_DLEN, 0, NULL, 0);
sendToLayer3(ACK, DEFAULT_HEADER_LEN);
}
else{
int len = sock->unolen;

for(int i = 0; i < len; i++){
for(int j = i + 1; j < len; j++){
if(get_seq(sock->unorder[i])>get_seq(sock->unorder[j])){
my_swap(&sock->unorder[i], &sock->unorder[j]);
}
}
}
int index = 0;
for(; index < len; index++){
if(get_seq(sock->unorder[index]) == sock->window.wnd_recv->expect_seq){
sock->window.wnd_send->nextseq = get_ack(sock->unorder[index]);
sock->window.wnd_recv->expect_seq += get_plen(sock->unorder[index]) - DEFAULT_HEADER_LEN;
pkt2buffer(sock, sock->unorder[index]);
}
else if(get_seq(sock->unorder[index]) < (sock->window.wnd_recv->expect_seq)) continue;
else break;
}
for (int i = index; i < len; i++){
memcpy(sock->unorder[i - index], sock->unorder[i], sizeof(sock->unorder[i]));
}
sock->unolen -= index;

uint32_t seq = sock->window.wnd_send->nextseq;
uint32_t ack = sock->window.wnd_recv->expect_seq;
uint8_t flag = ACK_FLAG_MASK;
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, TCP_RECVWN_SIZE-(sock->unolen)*MAX_DLEN, 0, NULL, 0);

sendToLayer3(ACK, DEFAULT_HEADER_LEN);
}
}
}

4. my_swap( )

1
2
3
4
5
6
7
void my_swap(char** a, char** b){
char* tem = (char*)malloc(MAX_LEN);
memcpy(tem, a, get_plen(b));
memcpy(a, b, get_plen(b));
memcpy(b, tem, get_plen(tem));
free(tem);
}

实现累计应答

Figure 1. 累计应答

1. clientrdt( )
clientrdt( )函数中添加累计应答处理。当发送方收到任意大于当前base的ack报文后,直接把当前base进行更新即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void clientrdt(tju_tcp_t* sock, char* pkt){ // 客户端处理数据包
if (sock->window.wnd_send->base == sock->window.wnd_send->nextseq) return;
if (get_ack(pkt) < sock->window.wnd_send->base) return;
``````
``````
``````
if(get_ack(pkt) > sock->window.wnd_send->base){
``````
``````
``````
while(pthread_mutex_lock(&(sock->send_lock)) != 0); //加锁
while(sock->window.wnd_send->base < get_ack(pkt) && sock->window.wnd_send->base < sock->window.wnd_send->nextseq ){
int pkt_len = pktlist[sock->packetl]->header.plen;
int datalen = pkt_len - DEFAULT_HEADER_LEN;
sock->sending_len -= datalen;
sock->window.wnd_send->base += datalen;
sock->packetl = (sock->packetl + 1) % TCP_SENDWN_SIZE;
}
pthread_mutex_unlock(&(sock->send_lock)); //解锁
}
``````
``````
``````
}

超时重传

1. tju_send( )
tju_send( ) 中创建了一个新的线程判断是否要进行重传。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int resend_thread_flag = 0;

int tju_send(tju_tcp_t* sock, const void *buffer, int len){
``````
``````
``````
if (!resend_thread_flag){
resend_thread_flag = 1;
pthread_t resendthreadid;
int ret = pthread_create(&resendthreadid, NULL,resend_pkt, (void *)sock);
}
``````
``````
``````
}

2. resend_pkt( )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void *resend_pkt(tju_tcp_t* sock){
struct timeval start_time,now_time;
gettimeofday(&start_time, NULL);
long deltatime = 0;
while (1){
if (sock->window.wnd_send->base < sock->window.wnd_send->nextseq){
gettimeofday(&now_time, NULL);
deltatime = now_time.tv_sec - start_time.tv_sec;
if (deltatime > 100) break; //不理解
long nowtimeval = 1000000 * (now_time.tv_sec - sock->window.wnd_send->send_time.tv_sec) + (now_time.tv_usec - sock->window.wnd_send->send_time.tv_usec);
long timeoutval = 1000000 * (sock->window.wnd_send->timeout.tv_sec) + (sock->window.wnd_send->timeout.tv_usec);
if (nowtimeval > timeoutval){ //超时重传
while(pktlist[sock->packetl] == NULL);
uint32_t seq = pktlist[sock->packetl]->header.seq_num;
uint32_t ack = pktlist[sock->packetl]->header.ack_num;
uint8_t flag = ACK_FLAG_MASK;
int pkt_len = pktlist[sock->packetl]->header.plen;
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack,DEFAULT_HEADER_LEN, pkt_len, flag, 0, 0, pktlist[sock->packetl]->data, pkt_len - DEFAULT_HEADER_LEN);
sendToLayer3(ACK, pkt_len);
gettimeofday(&pktlist[sock->packetl]->sent_time, NULL);
gettimeofday(&sock->window.wnd_send->send_time, NULL);
}
}
}
}

3. clientrdt( )
clientrdt( ) 函数中添加动态设置超时间隔的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void clientrdt(tju_tcp_t* sock, char* pkt){ // 客户端处理数据包
``````
``````
``````
if(get_ack(pkt) > sock->window.wnd_send->base){
//更新RTO
struct timeval nowtime;
gettimeofday(&nowtime, NULL);
long ntime = 1000000 * nowtime.tv_sec + nowtime.tv_usec;
long samplertt = 1000000 * (nowtime.tv_sec-pktlist[sock->packetl]->sent_time.tv_sec)+nowtime.tv_usec-pktlist[sock->packetl]->sent_time.tv_usec;
long estmatedrtt = sock->window.wnd_send->estmated_rtt;
long devrtt = sock->window.wnd_send->dev_rtt;
estmatedrtt = 1.0 * estmatedrtt * 7 / 8 + samplertt / 8;
devrtt = 1.0 * devrtt * 3 / 4 + (samplertt > 1.0 * estmatedrtt ? (samplertt - estmatedrtt) : (estmatedrtt - samplertt)) / 4;
sock->window.wnd_send->timeout.tv_usec = (estmatedrtt + 4 * devrtt) % 1000000;
sock->window.wnd_send->timeout.tv_sec = (estmatedrtt + 4 * devrtt) / 1000000;
sock->window.wnd_send->estmated_rtt = estmatedrtt;
sock->window.wnd_send->dev_rtt = devrtt;
double dsamplertt=(double)samplertt/(double)1000000;
double destmatedrtt=(double)estmatedrtt/(double)1000000;
double ddevrtt=(double)devrtt/(double)1000000;
double dtimegap=(double)(estmatedrtt + 4 * devrtt)/(double)1000000;

while(pthread_mutex_lock(&(sock->window.wnd_send->ack_cnt_lock)) != 0);
sock->window.wnd_send->ack_cnt = 1;
pthread_mutex_unlock(&(sock->window.wnd_send->ack_cnt_lock));
``````
``````
``````
}
}

快速重传

Figure 1. 快速重传

1. clientrdt( )
clientrdt( ) 函数中添加快速重传的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void clientrdt(tju_tcp_t* sock, char* pkt){ // 客户端处理数据包
``````
``````
``````
if(get_ack(pkt) == sock->window.wnd_send->base){ //冗余ACK
while(pthread_mutex_lock(&(sock->window.wnd_send->ack_cnt_lock)) != 0);
sock->window.wnd_send->ack_cnt++;
pthread_mutex_unlock(&(sock->window.wnd_send->ack_cnt_lock));

if(sock->window.wnd_send->ack_cnt == 3){ //快速重传
``````
uint32_t seq = pktlist[sock->packetl]->header.seq_num;
uint32_t ack = pktlist[sock->packetl]->header.ack_num;
uint8_t flag = ACK_FLAG_MASK;
int pkt_len = pktlist[sock->packetl]->header.plen;
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, pkt_len, ACK_FLAG_MASK, 0, 0, pktlist[sock->packetl]->data, pkt_len - DEFAULT_HEADER_LEN);
sendToLayer3(ACK, pkt_len);
sock->window.wnd_send->ack_cnt = 0;
gettimeofday(&pktlist[sock->packetl]->sent_time, NULL);
}
return;
}
``````
``````
``````
}

流量控制

接收方计算接受缓冲区大小

1. serverrdt( )

1
2
3
4
5
6
7
8
void serverrdt(tju_tcp_t* sock, char* pkt){// 服务端处理数据包
``````
uint32_t seq = sock->window.wnd_send->nextseq;
uint32_t ack = sock->window.wnd_recv->expect_seq;
uint8_t flag = ACK_FLAG_MASK;
char *ACK = create_packet_buf(sock->established_local_addr.port, sock->established_remote_addr.port, seq, ack, DEFAULT_HEADER_LEN, DEFAULT_HEADER_LEN, flag, TCP_RECVWN_SIZE-(sock->unolen)*MAX_DLEN, 0, NULL, 0);
``````
}

发送方调整发送缓冲区大小

1. clientrdt( )
clientrdt( )函数确定window_size大小。

1
2
3
4
5
void clientrdt(tju_tcp_t *sock, char *pkt){
``````
``````get_advertised_window(pkt)``````
``````
}

2. get_advertised_window( )

1
2
3
4
5
6
uint16_t get_advertised_window(char* msg){
int offset = 17;
uint16_t var;
memcpy(&var, msg+offset, SIZE16);
return ntohs(var);
}

0窗口探测

1. send_pkt( )
send_pkt( )发送数据的线程中加入一个判断条件。
0窗口探测用于在ADVERTISED WINDOW为0的情况下,发送大小为1的数据报文以获得实时的窗口返回值。

1
2
3
4
5
6
7
8
9
10
11
12
void *send_pkt(tju_tcp_t* sock){
``````
if((sock->window.wnd_send->nextseq > sock->window.wnd_send->base)&&sock->window.wnd_send->rwnd == 0){
len = 1;
}
if (len > 0){
``````
``````
``````
}
``````
}

拥塞控制

慢启动

1. 套接字的初始化使得发送方从其他状态进入慢启动状态。

1
2
3
tju_tcp_t* tju_socket(){
sock->window.wnd_send->congestion_status = SLOW_START;
}

2. resend_pkt( )函数超时事件使得发送方从其他状态进入慢启动状态。

1
2
3
if (nowtimeval > timeoutval){
sock->window.wnd_send->congestion_status = SLOW_START;
}

在慢启动状态下,发送方每当接收到正确的 ACK 报文,就会将其拥塞窗口增大 1 个 MSS(MAX_DLEN)。虽然是不断的自增 MSS,但是由于拥塞窗口的增大(进而导致发送窗口的增大),每次自增的次数为 1->2->4->8 直至达到ssthresh,所以拥塞窗口整体上呈现指数增长的趋势。
3. clientrdt( )
clientrdt( ) 函数中发送方每当接收到正确的 ACK 报文,就会将其拥塞窗口增大 1 个 MSS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void clientrdt(tju_tcp_t *sock, char *pkt){   
``````
if(sock->window.wnd_send->congestion_status == SLOW_START){ //慢启动
``````
else if(get_ack(pkt) > sock->window.wnd_send->base){ // new ack
sock->window.wnd_send->cwnd += MSS;

while(pthread_mutex_lock(&(sock->window.wnd_send->ack_cnt_lock)) != 0);
sock->window.wnd_send->ack_cnt = 0;
pthread_mutex_unlock(&(sock->window.wnd_send->ack_cnt_lock));
``````
}
``````
}
``````
}

4. resend_pkt( )
resend_pkt( ) 函数中处理超时重传导致慢启动参数变化。

1
2
3
4
5
6
7
8
9
10
void *resend_pkt(tju_tcp_t* sock){
``````
if (nowtimeval > timeoutval){
sock->window.wnd_send->ssthresh = sock->window.wnd_send->cwnd / 2; //拥塞阈值变成拥塞窗口大小的一半
if(sock->window.wnd_send->ssthresh < MAX_DLEN) sock->window.wnd_send->ssthresh = MSS; //调整
sock->window.wnd_send->cwnd = MSS; //拥塞窗口变成1MSS
sock->window.wnd_send->congestion_status = SLOW_START; //拥塞状态不变
}
``````
}

拥塞避免

1. 拥塞窗口超过 ssthresh 进入拥塞避免状态。

1
2
3
4
5
6
7
8
9
10
11
void clientrdt(tju_tcp_t *sock, char *pkt){ 
``````
if(sock->window.wnd_send->congestion_status == SLOW_START){ //慢启动
``````
if(sock->window.wnd_send->cwnd >= sock->window.wnd_send->ssthresh){ //cwnd达到ssthresh,转为拥塞避免状态
sock->window.wnd_send->congestion_status = CONGESTION_AVOIDANCE;
}
``````
}
``````
}

2. 快速恢复阶段收到正确的 ACK 使得发送方从其他状态进入拥塞避免状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void clientrdt(tju_tcp_t *sock, char *pkt){ 
``````
else if (sock->window.wnd_send->congestion_status == FAST_RECOVERY){ //快速恢复
``````
else if(get_ack(pkt) > sock->window.wnd_send->base){ //new ack
sock->window.wnd_send->cwnd = sock->window.wnd_send->ssthresh;

while(pthread_mutex_lock(&(sock->window.wnd_send->ack_cnt_lock)) != 0);
sock->window.wnd_send->ack_cnt = 0;
pthread_mutex_unlock(&(sock->window.wnd_send->ack_cnt_lock));

sock->window.wnd_send->window_size = min(get_advertised_window(pkt), sock->window.wnd_send->cwnd);

sock->window.wnd_send->congestion_status = CONGESTION_AVOIDANCE;
}
}
``````
}

3. 在拥塞避免状态下,发送方每当收到一个正确的 ACK 报文,拥塞窗口就会增大(1/cwnd)个 MSS。拥塞窗口整体上呈现线性增长的趋势。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void clientrdt(tju_tcp_t *sock, char *pkt){   
``````
else if (sock->window.wnd_send->congestion_status == CONGESTION_AVOIDANCE) { //拥塞避免
``````
else if(get_ack(pkt) > sock->window.wnd_send->base){ //new ack
sock->window.wnd_send->cwnd += MSS * ((double)MSS / (double)sock->window.wnd_send->cwnd);

while(pthread_mutex_lock(&(sock->window.wnd_send->ack_cnt_lock)) != 0);
sock->window.wnd_send->ack_cnt = 0;
pthread_mutex_unlock(&(sock->window.wnd_send->ack_cnt_lock));

sock->window.wnd_send->window_size = min(get_advertised_window(pkt), sock->window.wnd_send->cwnd);
}
}
``````
}

快速恢复

1. clientrdt( ) 发送方收到 3 个冗余 ACK 时,进入快速恢复状态。

1
2
3
4
5
6
if(sock->window.wnd_send->ack_cnt == 3){  //快速重传
sock->window.wnd_send->ssthresh = sock->window.wnd_send->cwnd / 2;
sock->window.wnd_send->cwnd = sock->window.wnd_send->ssthresh + 3 * MSS;
sock->window.wnd_send->congestion_status = FAST_RECOVERY; //快速恢复
``````
}

2. 在快速恢复状态下,如果还是收到冗余 ACK,那么依然在此状态,cwnd+=MSS;当收到正确 ACK 时,则进入拥塞避免状态 ,cwnd=ssthresh;当超时时,回到慢启动,cwnd=1MSS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void clientrdt(tju_tcp_t *sock, char *pkt){ 
``````
``````
``````
else if (sock->window.wnd_send->congestion_status == FAST_RECOVERY){ //快速恢复
if(get_ack(pkt) == sock->window.wnd_send->base){//dup ack
while(pthread_mutex_lock(&(sock->window.wnd_send->ack_cnt_lock)) != 0);
sock->window.wnd_send->ack_cnt++;
pthread_mutex_unlock(&(sock->window.wnd_send->ack_cnt_lock));

sock->window.wnd_send->cwnd += MSS;

sock->window.wnd_send->window_size = min(get_advertised_window(pkt), sock->window.wnd_send->cwnd);

sock->window.wnd_send->congestion_status = FAST_RECOVERY;
return;
}
else if(get_ack(pkt) > sock->window.wnd_send->base){ //new ack
sock->window.wnd_send->cwnd = sock->window.wnd_send->ssthresh;

while(pthread_mutex_lock(&(sock->window.wnd_send->ack_cnt_lock)) != 0);
sock->window.wnd_send->ack_cnt = 0;
pthread_mutex_unlock(&(sock->window.wnd_send->ack_cnt_lock));

sock->window.wnd_send->window_size = min(get_advertised_window(pkt), sock->window.wnd_send->cwnd);

sock->window.wnd_send->congestion_status = CONGESTION_AVOIDANCE;
}
}
``````
``````
``````
}