conntrack通信原理分析

说明

Netfilter框架研究 这篇文章中说过,通过cat /proc/net/nf_conntrack 或者是通过 conntrack -L -o extend 方式查看当前系统中的连接跟踪信息。如下所示:

1
ipv4 2 tcp 6 431916 ESTABLISHED src=172.22.44.167 dst=172.22.44.196 sport=44972 dport=18012 src=172.22.44.196 dst=172.22.44.167 sport=18012 dport=44972 [ASSURED] mark=0 zone=0 use=2

但是网络上大部分资料都是通过注册NF_INET_LOCAL_IN这样一个HOOK函数的方式来获取信息。既然在用户态可以直接获取conntrack连接跟踪信息,那么就不需要通过注册一个NF_INET_LOCAL_IN这样的函数来实现。
本文就是探究在用户态下如何实施获取到连接跟踪信息。

代码分析

conntrack-go为例来看看conntrack的实现原理。基于first comit这个commit来进行分析。首先分析程序的入口文件,conntrack-agent.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main(){
h,err := lib.NewHandle(unix.NETLINK_NETFILTER)
if err != nil {
log.Fatalln("failed to create Handle..ERROR:",err)
}
err = h.ConntrackTableFlush(lib.ConntrackTable)
if err != nil {
log.Fatalln("failed to flush conntrack table..ERROR:", err)
}
for {
flows, err := h.ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))
if err == nil {
if len(flows) != 0 {
for _, flow := range flows {
fmt.Println(flow)
}
}
}
<-time.After(time.Millisecond * 50)
}
}

其实主要就是三个步骤.

  1. h,err := lib.NewHandle(unix.NETLINK_NETFILTER)
  2. h.ConntrackTableFlush(lib.ConntrackTable)
  3. h.ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))

进入到NewHandle(….)函数内部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func NewHandle(nlFamilies ...int) (*Handle, error) {
return newHandle(None(), None(), nlFamilies...)
}
func newHandle(newNs, curNs NsHandle, nlFamilies ...int) (*Handle, error) {
h := &Handle{sockets: map[int]*SocketHandle{}}
fams := SupportedNlFamilies
if len(nlFamilies) != 0 {
fams = nlFamilies
}
for _, f := range fams {
s, err := GetNetlinkSocketAt(newNs, curNs, f)
if err != nil {
return nil, err
}
h.sockets[f] = &SocketHandle{Socket: s}
}
return h, nil
}

newNs和curNs,使用newNs替换curNs 即创建一个network namespace,但是这对我们来说没有必要,所以都是nil. 程序直接调用GetNetlinkSocketAt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// GetNetlinkSocketAt opens a netlink socket in the network namespace newNs
// and positions the thread back into the network namespace specified by curNs,
// when done. If curNs is close, the function derives the current namespace and
// moves back into it when done. If newNs is close, the socket will be opened
// in the current network namespace.
func GetNetlinkSocketAt(newNs, curNs NsHandle, protocol int) (*NetlinkSocket, error) {
c, err := executeInNetns(newNs, curNs)
if err != nil {
return nil, err
}
defer c()
return getNetlinkSocket(protocol)
}

// 命名空间的设置不是我们关心的重点,我们主要是关心如何如何通过protocol创建一个socket对象.
func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
fd, err := unix.Socket(unix.AF_NETLINK, unix.SOCK_RAW|unix.SOCK_CLOEXEC, protocol)
if err != nil {
return nil, err
}
s := &NetlinkSocket{
fd: int32(fd),
}
s.lsa.Family = unix.AF_NETLINK
if err := unix.Bind(fd, &s.lsa); err != nil {
unix.Close(fd)
return nil, err
}

return s, nil
}

getNetlinkSocket就是常见的socket创建方法.

  1. fd, err := unix.Socket(unix.AF_NETLINK, unix.SOCK_RAW|unix.SOCK_CLOEXEC, protocol)创建一个socket类型的fd
  2. unix.Bind(fd, &s.lsa)绑定socket返回.

最后我们再回到NewHandle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 得到netfilter类型的socket fd
s, err := GetNetlinkSocketAt(newNs, curNs, f)

// 创建SocketHandle对象,并将socket属性赋值为socket fd
// SocketHandle contains the netlink socket and the associated
// sequence counter for a specific netlink family
type SocketHandle struct {
Seq uint32
Socket *NetlinkSocket
}

type NetlinkSocket struct {
fd int32
lsa unix.SockaddrNetlink
sync.Mutex
}
h.sockets[f] = &SocketHandle{Socket: s}

ConntrackTableFlush(lib.ConntrackTable)

首先分析出 lib.ConntrackTable的值.

1
2
3
4
5
type ConntrackTableType uint8
const (
ConntrackTable = ConntrackTableType(1)
ConntrackExpectTable = ConntrackTableType(2)
)

接下来分析ConntrackTableFlush顾名思义.ConntrackTableFlush就是清空连接跟踪表.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (h *Handle) ConntrackTableFlush(table ConntrackTableType) error {
req := h.newConntrackRequest(table, unix.AF_INET, IPCTNL_MSG_CT_DELETE, unix.NLM_F_ACK)
_, err := req.Execute(unix.NETLINK_NETFILTER, 0)
return err
}
func (h *Handle) newConntrackRequest(table ConntrackTableType, family InetFamily, operation, flags int) *NetlinkRequest {
// Create the Netlink request object
req := h.newNetlinkRequest((int(table)<<8)|operation, flags)
// Add the netfilter header
msg := &Nfgenmsg{
NfgenFamily: uint8(family),
Version: NFNETLINK_V0,
ResId: 0,
}
req.AddData(msg)
return req
}

req := h.newNetlinkRequest((int(table)<<8)|operation, flags)是创建一个NetlinkRequest,至于此时各个参数的内容:

  1. table:1
  2. famliy: unix.AF_INET(值为2)
  3. operation:2
  4. flag: unix.NLM_F_ACK(值为4)

分析程序newNetlinkRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type NetlinkRequest struct {
unix.NlMsghdr
Data []NetlinkRequestData
RawData []byte
Sockets map[int]*SocketHandle
}

func (h *Handle) newNetlinkRequest(proto, flags int) *NetlinkRequest {
// Do this so that package API still use nl package variable nextSeqNr
if h.sockets == nil {
return NewNetlinkRequest(proto, flags)
}
return &NetlinkRequest{
// 一个标准的netlink message的头,
// 参见:https://blog.spoock.com/2019/11/25/lkm/#%E7%A4%BA%E4%BE%8B%E7%A8%8B%E5%BA%8F
/*
nlh = (struct nlmsghdr *) malloc(NLMSG_SPACE(MAX_PAYLOAD));
memset(nlh, 0, NLMSG_SPACE(MAX_PAYLOAD));
nlh->nlmsg_len = NLMSG_SPACE(MAX_PAYLOAD);
nlh->nlmsg_pid = getpid();
nlh->nlmsg_flags = 0;
*/
NlMsghdr: unix.NlMsghdr{
Len: uint32(unix.SizeofNlMsghdr),
Type: uint16(proto),
Flags: unix.NLM_F_REQUEST | uint16(flags),
},
Sockets: h.sockets,
}
}

按照NETLINK(7) 中的说明,

nlmsg_type can be one of the standard message types: NLMSG_NOOP message is to be ignored, NLMSG_ERROR message signals an error and the payload contains an nlmsgerr structure, NLMSG_DONE message terminates a multipart message.\
struct nlmsgerr { \
int error; / Negative errno or 0 for acknowledgements / \
struct nlmsghdr msg; / Message header that caused the error
/\
};

对于nlmsg_type来说,存在四种类型.每种类型对应的Int值分别是:

1
2
3
4
5
// referer:https://elixir.bootlin.com/linux/v4.7/source/include/uapi/linux/netlink.h#L95
#define NLMSG_NOOP 0x1 /* Nothing. */
#define NLMSG_ERROR 0x2 /* Error */
#define NLMSG_DONE 0x3 /* End of a dump */
#define NLMSG_OVERRUN 0x4 /* Data lost */

同时在 include/uapi/linux/netfilter/nfnetlink_conntrack.h还定义了一些消息控制类型.如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
enum cntl_msg_types {
IPCTNL_MSG_CT_NEW,
IPCTNL_MSG_CT_GET,
IPCTNL_MSG_CT_DELETE,
IPCTNL_MSG_CT_GET_CTRZERO,
IPCTNL_MSG_CT_GET_STATS_CPU,
IPCTNL_MSG_CT_GET_STATS,
IPCTNL_MSG_CT_GET_DYING,
IPCTNL_MSG_CT_GET_UNCONFIRMED,

IPCTNL_MSG_MAX
};

本例中的IPCTNL_MSG_CT_DELETE值是2,对应于cntl_msg_types中的IPCTNL_MSG_CT_DELETE
对于nlmsg_flags,则是:

NLM_F_REQUEST Must be set on all request messages.
NLM_F_MULTI The message is part of a multipart mes‐
sage terminated by NLMSG_DONE.
NLM_F_ACK Request for an acknowledgment on success.
NLM_F_ECHO Echo this request.
Additional flag bits for GET requests

对应的值是:

1
2
3
4
5
6
7
/* Flags values */

#define NLM_F_REQUEST 1 /* It is request message. */
#define NLM_F_MULTI 2 /* Multipart message, terminated by NLMSG_DONE */
#define NLM_F_ACK 4 /* Reply with ack, with zero or error code */
#define NLM_F_ECHO 8 /* Echo this request */
#define NLM_F_DUMP_INTR 16 /* Dump was inconsistent due to sequence change */

所以在本例中选择是1和4,即NLM_F_REQUESTNLM_F_ACK表示是一条请求信息,并且需要回复.
最后newNetlinkRequest成功执行,返回一个NetlinkRequest类型的机构体. 内容如下:

1
2
3
4
5
6
7
8
9
&NetlinkRequest{
NlMsghdr: unix.NlMsghdr{
Len: uint32(unix.SizeofNlMsghdr),
Type: uint16(proto),
Flags: unix.NLM_F_REQUEST | uint16(flags),
},
// 通过前面GetNetlinkSocketAt()函数得到的socket fd
Sockets: h.sockets,
}

回到newConntrackRequest.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (h *Handle) newConntrackRequest(table ConntrackTableType, family InetFamily, operation, flags int) *NetlinkRequest {
// Create the Netlink request object
req := h.newNetlinkRequest((int(table)<<8)|operation, flags)
// Add the netfilter header
msg := &Nfgenmsg{
NfgenFamily: uint8(family),
Version: NFNETLINK_V0,
ResId: 0,
}
req.AddData(msg)
return req
}

func (req *NetlinkRequest) AddData(data NetlinkRequestData) {
req.Data = append(req.Data, data)
}

type NetlinkRequest struct {
unix.NlMsghdr
Data []NetlinkRequestData
RawData []byte
Sockets map[int]*SocketHandle
}
type NetlinkRequestData interface {
Len() int
Serialize() []byte
}

获得了NetlinkRequest结构体之后,调用AddData()方法,将数据填充到Data属性中.Data属性是一个NetlinkRequestData的接口.
当执行完毕newConntrackRequest之后,程序回到主函数ConntrackTableFlush

1
2
3
4
5
func (h *Handle) ConntrackTableFlush(table ConntrackTableType) error {
req := h.newConntrackRequest(table, unix.AF_INET, IPCTNL_MSG_CT_DELETE, unix.NLM_F_ACK)
_, err := req.Execute(unix.NETLINK_NETFILTER, 0)
return err
}

调用NetlinkRequestExecute()方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Execute the request against a the given sockType.
// Returns a list of netlink messages in serialized format, optionally filtered
// by resType.
func (req *NetlinkRequest) Execute(sockType int, resType uint16) ([][]byte, error) {
var (
s *NetlinkSocket
err error
)

if req.Sockets != nil {
// 获取socket 对象
if sh, ok := req.Sockets[sockType]; ok {
s = sh.Socket
// 设置序列号为1
req.Seq = atomic.AddUint32(&sh.Seq, 1)
}
}
sharedSocket := s != nil

if s == nil {
s, err = getNetlinkSocket(sockType)
if err != nil {
return nil, err
}
defer s.Close()
} else {
s.Lock()
defer s.Unlock()
}
if err := s.Send(req); err != nil {
return nil, err
}

pid, err := s.GetPid()
if err != nil {
return nil, err
}

var res [][]byte

done:
for {
msgs, err := s.Receive()
if err != nil {
return nil, err
}
for _, m := range msgs {
if m.Header.Seq != req.Seq {
if sharedSocket {
continue
}
return nil, fmt.Errorf("Wrong Seq nr %d, expected %d", m.Header.Seq, req.Seq)
}
if m.Header.Pid != pid {
return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
}
if m.Header.Type == unix.NLMSG_DONE {
break done
}
if m.Header.Type == unix.NLMSG_ERROR {
native := NativeEndian()
error := int32(native.Uint32(m.Data[0:4]))
if error == 0 {
break done
}
return nil, syscall.Errno(-error)
}
if resType != 0 && m.Header.Type != resType {
continue
}
res = append(res, m.Data)
if m.Header.Flags&unix.NLM_F_MULTI == 0 {
break done
}
}
}
return res, nil
}

其中的关键代码是s.Send(req). S是NetlinkSocket,req是包含了请求数据的对象.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type NetlinkSocket struct {
fd int32
lsa unix.SockaddrNetlink
sync.Mutex
}

type NetlinkRequest struct {
unix.NlMsghdr
Data []NetlinkRequestData
RawData []byte
Sockets map[int]*SocketHandle
}

func (s *NetlinkSocket) Send(request *NetlinkRequest) error {
fd := int(atomic.LoadInt32(&s.fd))
if fd < 0 {
return fmt.Errorf("Send called on a closed socket")
}
if err := unix.Sendto(fd, request.Serialize(), 0, &s.lsa); err != nil {
return err
}
return nil
}

此时的NetlinkSocket是:

NetlinkRequest是:

NetlinkSocket的send()方法本质上还是调用unix的send()方法,unix.Sendto(fd, request.Serialize(), 0, &s.lsa)只是需要对request的数据进行序列化,就会调用NetlinkRequestSerialize方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Serialize the Netlink Request into a byte array
func (req *NetlinkRequest) Serialize() []byte {
length := unix.SizeofNlMsghdr
dataBytes := make([][]byte, len(req.Data))
for i, data := range req.Data {
// 其中的req.Data是一个Nfgenmsg类型的结构体
dataBytes[i] = data.Serialize()
length = length + len(dataBytes[i])
}
length += len(req.RawData)

req.Len = uint32(length)
b := make([]byte, length)
hdr := (*(*[unix.SizeofNlMsghdr]byte)(unsafe.Pointer(req)))[:]
next := unix.SizeofNlMsghdr
copy(b[0:next], hdr)
for _, data := range dataBytes {
for _, dataByte := range data {
b[next] = dataByte
next = next + 1
}
}
// Add the raw data if any
if len(req.RawData) > 0 {
copy(b[next:length], req.RawData)
}
return b
}

所以data.Serialize()本质上就是调用Nfgenmsg的Serialize()方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Nfgenmsg struct {
NfgenFamily uint8
Version uint8
ResId uint16 // big endian
}

func (msg *Nfgenmsg) Len() int {
return SizeofNfgenmsg
}

func DeserializeNfgenmsg(b []byte) *Nfgenmsg {
return (*Nfgenmsg)(unsafe.Pointer(&b[0:SizeofNfgenmsg][0]))
}

//将Nfgenmsg转换为一个byte数组.
func (msg *Nfgenmsg) Serialize() []byte {
return (*(*[SizeofNfgenmsg]byte)(unsafe.Pointer(msg)))[:]
}

回到主函数 Serialize

1
2
3
4
5
6
7
8
9
10
b := make([]byte, length)
hdr := (*(*[unix.SizeofNlMsghdr]byte)(unsafe.Pointer(req)))[:]
next := unix.SizeofNlMsghdr
copy(b[0:next], hdr)
for _, data := range dataBytes {
for _, dataByte := range data {
b[next] = dataByte
next = next + 1
}
}

将req转换为长度为SizeofNlMsghdr的byte.最后将req和req.Data全部填充到b中(b := make([]byte, length)) , 最终调用unix.Sendto(fd, request.Serialize(), 0, &s.lsa);发送数据.
程序继续执行,回到函数Execute函数. 调用 Sendto() 发送数据之后,接下来就是接受数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
done:
for {
// 获取数据
msgs, err := s.Receive()
if err != nil {
return nil, err
}
for _, m := range msgs {
// 检测序列号,判断两者是否一致
if m.Header.Seq != req.Seq {
if sharedSocket {
continue
}
return nil, fmt.Errorf("Wrong Seq nr %d, expected %d", m.Header.Seq, req.Seq)
}
// 检测pid是否一直
if m.Header.Pid != pid {
return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
}
// 通过Netlink Message的头类型判断数据是什么类型
if m.Header.Type == unix.NLMSG_DONE { // 数据发送完毕
break done
}
if m.Header.Type == unix.NLMSG_ERROR { // 错误信息
native := NativeEndian()
error := int32(native.Uint32(m.Data[0:4]))
if error == 0 {
break done
}
return nil, syscall.Errno(-error)
}
if resType != 0 && m.Header.Type != resType {
continue
}
// 添加数据
res = append(res, m.Data)
// 如果flags和NLM_F_MULTI并级是0,则同样结束遍历.
if m.Header.Flags&unix.NLM_F_MULTI == 0 {
break done
}
}
}
return res, nil

unix.NLMSG_ERROR中,如果确定前面error信息的前面4个字节是0,则同样表示请求结束.

1
2
3
4
5
6
7
8
if m.Header.Type == unix.NLMSG_ERROR {   // 错误信息
native := NativeEndian()
error := int32(native.Uint32(m.Data[0:4]))
if error == 0 {
break done
}
return nil, syscall.Errno(-error)
}

我们观察此时我们的请求书数据.

所以,虽然是NLMSG_ERROR类型的相应包,但是只要error的前面4个字节是0,则表示没有错误,成功执行.
至此,我们的 h.ConntrackTableFlush(lib.ConntrackTable)就执行完毕, 本质上就是发送IPCTNL_MSG_CT_DELETE请求,清空连接跟踪表.

ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))

当执行完毕 ConntrackTableFlush(lib.ConntrackTable)之后 , 程序就会执行 ConntrackTableFlush(lib.ConntrackTable) ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))获取连接跟踪的数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (h *Handle) ConntrackTableList(table ConntrackTableType, family InetFamily) ([]*ConntrackFlow, error) {
res, err := h.dumpConntrackTable(table, family)
if err != nil {
return nil, err
}

// Deserialize all the flows
var result []*ConntrackFlow
for _, dataRaw := range res {
result = append(result, parseRawData(dataRaw))
}

return result, nil
}

func (h *Handle) dumpConntrackTable(table ConntrackTableType, family InetFamily) ([][]byte, error) {
req := h.newConntrackRequest(table, family, IPCTNL_MSG_CT_GET, unix.NLM_F_DUMP)
return req.Execute(unix.NETLINK_NETFILTER, 0)
}

可以看到,其实ConntrackTableList和前面分析的ConntrackTableFlush 整个流程基本相同. 不同之处在于:

  1. ConntrackTableList是调用newConntrackRequest(table, family, IPCTNL_MSG_CT_GET, unix.NLM_F_DUMP)用于获取信息,而在ConntrackTableFlush 则是调用h.newConntrackRequest(table, unix.AF_INET, IPCTNL_MSG_CT_DELETE, unix.NLM_F_ACK) 清除连接跟踪表.
  2. ConntrackTableListres, err := h.dumpConntrackTable(table, family),需要得到请求之后的返回值,即连接跟踪的数据.在ConntrackTableFlush,则是_, err := req.Execute(unix.NETLINK_NETFILTER, 0),丢弃了返回值.因为连接跟踪表仅仅只是关心请求清空的操作是否成功执行,并不关心返回数据,其实也没有数据返回.

接下里就是主要分析ConntrackTableList对返回数据的解析部分

parseRawData(dataRaw)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func parseRawData(data []byte) *ConntrackFlow {
s := &ConntrackFlow{}
var proto uint8
// First there is the Nfgenmsg header
// consume only the family field
reader := bytes.NewReader(data)
binary.Read(reader, NativeEndian(), &s.FamilyType)

// skip rest of the Netfilter header
reader.Seek(3, seekCurrent)
// The message structure is the following:
// <len, NLA_F_NESTED|CTA_TUPLE_ORIG> 4 bytes
// <len, NLA_F_NESTED|CTA_TUPLE_IP> 4 bytes
// flow information of the forward flow
// <len, NLA_F_NESTED|CTA_TUPLE_REPLY> 4 bytes
// <len, NLA_F_NESTED|CTA_TUPLE_IP> 4 bytes
// flow information of the reverse flow
for reader.Len() > 0 {
if nested, t, l := parseNfAttrTL(reader); nested {
switch t {
case CTA_TUPLE_ORIG:
if nested, t, _ = parseNfAttrTL(reader); nested && t == CTA_TUPLE_IP {
proto = parseIpTuple(reader, &s.Forward)
}
case CTA_TUPLE_REPLY:
if nested, t, _ = parseNfAttrTL(reader); nested && t == CTA_TUPLE_IP {
parseIpTuple(reader, &s.Reverse)
} else {
// Header not recognized skip it
reader.Seek(int64(l), seekCurrent)
}
case CTA_COUNTERS_ORIG:
s.Forward.Bytes, s.Forward.Packets = parseByteAndPacketCounters(reader)
case CTA_COUNTERS_REPLY:
s.Reverse.Bytes, s.Reverse.Packets = parseByteAndPacketCounters(reader)
}
}
}
if proto == TCP_PROTO {
reader.Seek(64, seekCurrent)
_, t, _, v := parseNfAttrTLV(reader)
if t == CTA_MARK {
s.Mark = uint32(v[3])
}
} else if proto == UDP_PROTO {
reader.Seek(16, seekCurrent)
_, t, _, v := parseNfAttrTLV(reader)
if t == CTA_MARK {
s.Mark = uint32(v[3])
}
}
return s
}

skip header

1
2
3
4
5
reader := bytes.NewReader(data)
binary.Read(reader, NativeEndian(), &s.FamilyType)

// skip rest of the Netfilter header
reader.Seek(3, seekCurrent)

将数据变为Reader对象之后,跳过前面4个字符.注释解释为跳过Netfilter header. 因为在Netlink Data中的前面4个字节一般都是代表nfgenmsg信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* General form of address family dependent message.
*/
struct nfgenmsg {
__u8 nfgen_family; /* AF_xxx */ // 在本例中是获取IPv4的连接跟踪信息,所以就是NFPROTO_IPV4,即2
__u8 version; /* nfnetlink version */ // 一般情况是NFNETLINK_V0,即0
__be16 res_id; /* resource id */ //一般情况是0
};

enum {
NFPROTO_UNSPEC = 0,
NFPROTO_INET = 1,
NFPROTO_IPV4 = 2,
NFPROTO_ARP = 3,
NFPROTO_NETDEV = 5,
NFPROTO_BRIDGE = 7,
NFPROTO_IPV6 = 10,
NFPROTO_DECNET = 12,
NFPROTO_NUMPROTO,
};

parseNfAttrTL(reader)

程序解析reader,获取数据.

1
2
3
4
5
6
7
8
9
10
func parseNfAttrTL(r *bytes.Reader) (isNested bool, attrType, len uint16) {
binary.Read(r, NativeEndian(), &len)
len -= SizeofNfattr

binary.Read(r, NativeEndian(), &attrType)
isNested = (attrType & NLA_F_NESTED) == NLA_F_NESTED
attrType = attrType & (NLA_F_NESTED - 1)

return isNested, attrType, len
}

此时,解析得到的数据如下:

attrType的类型定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
enum ctattr_type {
CTA_UNSPEC,
CTA_TUPLE_ORIG,
CTA_TUPLE_REPLY,
CTA_STATUS,
CTA_PROTOINFO,
CTA_HELP,
CTA_NAT_SRC,
#define CTA_NAT CTA_NAT_SRC /* backwards compatibility */
CTA_TIMEOUT,
CTA_MARK,
CTA_COUNTERS_ORIG,
CTA_COUNTERS_REPLY,
CTA_USE,
CTA_ID,
CTA_NAT_DST,
CTA_TUPLE_MASTER,
CTA_SEQ_ADJ_ORIG,
CTA_NAT_SEQ_ADJ_ORIG = CTA_SEQ_ADJ_ORIG,
CTA_SEQ_ADJ_REPLY,
CTA_NAT_SEQ_ADJ_REPLY = CTA_SEQ_ADJ_REPLY,
CTA_SECMARK, /* obsolete */
CTA_ZONE,
CTA_SECCTX,
CTA_TIMESTAMP,
CTA_MARK_MASK,
CTA_LABELS,
CTA_LABELS_MASK,
__CTA_MAX
};

当前值为1,则对应于CTA_TUPLE_ORIG类型. 对应的处理代码如下:

1
2
3
4
case CTA_TUPLE_ORIG:
if nested, t, _ = parseNfAttrTL(reader); nested && t == CTA_TUPLE_IP {
proto = parseIpTuple(reader, &s.Forward)
}

又经过一次parseNfAttrTL(reader)解析,此时返回值如下:

满足条件之后,程序进入到parseIpTuple(reader, &s.Forward)继续解析数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// This method parse the ip tuple structure
// The message structure is the following:
// <len, [CTA_IP_V4_SRC|CTA_IP_V6_SRC], 16 bytes for the IP>
// <len, [CTA_IP_V4_DST|CTA_IP_V6_DST], 16 bytes for the IP>
// <len, NLA_F_NESTED|nl.CTA_TUPLE_PROTO, 1 byte for the protocol, 3 bytes of padding>
// <len, CTA_PROTO_SRC_PORT, 2 bytes for the source port, 2 bytes of padding>
// <len, CTA_PROTO_DST_PORT, 2 bytes for the source port, 2 bytes of padding>
func parseIpTuple(reader *bytes.Reader, tpl *ipTuple) uint8 {
for i := 0; i < 2; i++ {
_, t, _, v := parseNfAttrTLV(reader)
/*
referer:include/uapi/linux/netfilter.h
enum ctattr_ip {
CTA_IP_UNSPEC,
CTA_IP_V4_SRC,
CTA_IP_V4_DST,
CTA_IP_V6_SRC,
CTA_IP_V6_DST,
__CTA_IP_MAX
};
*/
//解析源地址和目标地址
switch t {
case CTA_IP_V4_SRC, CTA_IP_V6_SRC:
tpl.SrcIP = v
case CTA_IP_V4_DST, CTA_IP_V6_DST:
tpl.DstIP = v
}
}
// Skip the next 4 bytes nl.NLA_F_NESTED|nl.CTA_TUPLE_PROTO
reader.Seek(4, seekCurrent)
_, t, _, v := parseNfAttrTLV(reader)
// 解析消息类型
/*
referer: include/uapi/linux/netfilter/nfnetlink_conntrack.h
enum ctattr_l4proto {
CTA_PROTO_UNSPEC,
CTA_PROTO_NUM,
CTA_PROTO_SRC_PORT,
CTA_PROTO_DST_PORT,
CTA_PROTO_ICMP_ID,
CTA_PROTO_ICMP_TYPE,
CTA_PROTO_ICMP_CODE,
CTA_PROTO_ICMPV6_ID,
CTA_PROTO_ICMPV6_TYPE,
CTA_PROTO_ICMPV6_CODE,
__CTA_PROTO_MAX
};
*/
if t == CTA_PROTO_NUM {
// 解析得到对应的protocol的编号,在本里是6,即TCP
tpl.Protocol = uint8(v[0])
}
// Skip some padding 3 bytes
reader.Seek(3, seekCurrent)
for i := 0; i < 2; i++ {
// 同样的方法解析得到源端口和目的端口
_, t, _ := parseNfAttrTL(reader)
switch t {
case CTA_PROTO_SRC_PORT:
parseBERaw16(reader, &tpl.SrcPort)
case CTA_PROTO_DST_PORT:
parseBERaw16(reader, &tpl.DstPort)
}
// Skip some padding 2 byte
reader.Seek(2, seekCurrent)
}
return tpl.Protocol
}

// 获取对应的属性和值
func parseNfAttrTLV(r *bytes.Reader) (isNested bool, attrType, len uint16, value []byte) {
isNested, attrType, len = parseNfAttrTL(r)

value = make([]byte, len)
binary.Read(r, binary.BigEndian, &value)
return isNested, attrType, len, value
}
func parseNfAttrTL(r *bytes.Reader) (isNested bool, attrType, len uint16) {
binary.Read(r, NativeEndian(), &len)
len -= SizeofNfattr

binary.Read(r, NativeEndian(), &attrType)
isNested = (attrType & NLA_F_NESTED) == NLA_F_NESTED
attrType = attrType & (NLA_F_NESTED - 1)

return isNested, attrType, len
}

按照上面的这种方式将数据解析完毕之后,就需要将信息返回.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type ipTuple struct {
Bytes uint64
DstIP net.IP
DstPort uint16
Packets uint64
Protocol uint8
SrcIP net.IP
SrcPort uint16
}

type ConntrackFlow struct {
FamilyType uint8
Forward ipTuple
Reverse ipTuple
Mark uint32
}
s := &ConntrackFlow{}
if proto == TCP_PROTO {
reader.Seek(64, seekCurrent)
_, t, _, v := parseNfAttrTLV(reader)
if t == CTA_MARK {
s.Mark = uint32(v[3])
}
} else if proto == UDP_PROTO {
reader.Seek(16, seekCurrent)
_, t, _, v := parseNfAttrTLV(reader)
if t == CTA_MARK {
s.Mark = uint32(v[3])
}
}

解析得到MARK值,填充到ConntrackFlow结构体中.

output

解析得到数据之后,接下来就是输出结果.

1
2
3
4
5
6
7
8
9
for {
flows, err := h.ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))
if err == nil {
if len(flows) != 0 {
for _, flow := range flows {
fmt.Println(flow)
}
}
}

最终输出结果.内容如下:

1
2
tcp	6 src=IP1 dst=IP2 sport=33508 dport=17250 packets=175 bytes=23397	src=IP2 dst=IP1 sport=17250 dport=33508 packets=214 bytes=28663 mark=0
udp 17 src=IP3 dst=IP4 sport=5353 dport=5353 packets=5 bytes=469 src=IP4 dst=IP3 sport=5353 dport=5353 packets=0 bytes=0 mark=0

总结

花了大量的时间来分析整个获取连接跟踪信息的过程,收获非常的大.但是由于精力和时间的关系,对于最后的解析netfilter的返回回来的数据没有详解,等有时间在详细说明吧.