近些年来互联网发展迅速,各种类型的产品得到充足的发展,交互性和复杂度都在迅速提高,都需要在极短的时间内将数据同 时投递给大量用户,因此传输技术自然变为未来制约发展的一个重要因素。在此之前对于通信协议首选TCP, 而今因为TCP的种种限制,UDP得到了很多开发人员的青睐,并在UDP的基础上开发出了众多的可靠算法,如 QUIC、KCP等,在此基础上对于UDP的关注面跨越了可靠性,进一步考虑放大UDP的通信能力,典型如何在短 时间内快速接收和处理超大量的数据包。在此之前,我曾研究UDP许多时日,试图在Go中最大化UDP通信能力;本文内容源自于对于对高数量级UDP通 信能力的优化经验,不同于形容TCP通信能力的单位,针对UDP的特性选择以“每秒多少个包”(PPS)来作为通 信能力单位更具有现实意义。

转自::https://mp.weixin.qq.com/s?__biz=MzA4ODg0NDkzOA==&mid=2247489667&idx=1&sn=7c4bd7d75e2b826fec59fb751db8ca67

参考:go语言中文文档:www.topgoer.com

如何实现最简epoll

对于UDP而言,严格来讲并不需要自己额外再实现epoll,但为了利用多核性能配合端口重用做到“多线程”绑定 统一地址端口,实现简化epoll是很必要的。与 *unix 相关的内容在 golang.org/x/sys/unix 包中,采用系统调用方式简化epoll。对于epoll的使用,主要 在三个API中:

int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int max_events, int timeout);

那么只需要在Go中调用到这三个API即可。

func PollerInit() (*Poller, error) { fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) if err != nil { return nil, os.NewSyscallError("epoll_create1", err) } poller := &Poller{ fd: fd, } return poller, nil }

正常情况下使用epoll的流程是拿到fd之后可添加、修改或删除触发实现,为了实现方便,将 epoll_ctl 的调用 改成了如下实现:

func (poller *Poller) Add(fd int, ev string) error { e := &unix.EpollEvent{ Fd: int32(fd), } switch ev { case "r": e.Events = unix.EPOLLIN case "w": e.Events = unix.EPOLLOUT case "rw": e.Events = unix.EPOLLIN | unix.EPOLLOUT default: return fmt.Errorf("unknow epoll event type") } return os.NewSyscallError("epoll_ctl add", unix.EpollCtl(poller.fd, unix.EPOLL_CTL_ADD, fd, e)) } func (poller *Poller) Mod(fd int, ev string) error { e := &unix.EpollEvent{ Fd: int32(fd), } switch ev { case "r": e.Events = unix.EPOLLIN case "w": e.Events = unix.EPOLLOUT case "rw": e.Events = unix.EPOLLIN | unix.EPOLLOUT default: return fmt.Errorf("unknow epoll event type") } return os.NewSyscallError("epoll_ctl mod", unix.EpollCtl(poller.fd, unix.EPOLL_CTL_MOD, fd, e)) } func (poller *Poller) Del(fd int) error { return os.NewSyscallError("epoll_ctl del", unix.EpollCtl(poller.fd, unix.EPOLL_CTL_DEL, fd, nil)) } 接下来是epoll_wait的调用,对应的事件通过回调函数返回给上层: func (poller *Poller) Polling(eventHandler func(fd int32, ev uint32)) error { evs := make([]unix.EpollEvent, EPollEventSize) for { n, err := unix.EpollWait(poller.fd, evs, 0) if err != nil { log.Printf("epoll_wait err: %v", err) } if n < 0 && err == unix.EINTR { continue } if err != nil { return os.NewSyscallError("epoll_wait", err) } for i := 0; i < n; i { eventHandler(evs[i].Fd, evs[i].Events) } } }

端口重用:SO_REUSEPORT

SO_REUSEPORT是linux 3.9版本新添加的,支持多个进程或线程绑定到同一地址端口。有了该选项之后,每个进 程或者线程都有属于自己的server socket,避免锁的竞争,可以充分利用到CPU多核资源。有了该选项之后, 可以考虑这样一种结构:每个goroutine拥有一个server socket,对应的拥有epoll fd,实际上就是epoll-per-goroutine结构,最大化利用CPU多核资源:

go语言中的栈数据结构(go语言如何实现百万级UDP通信)(1)

代码上实现起来相当简单,遵照SO_REUSEPORT的使用规则即可:

func NewUDPSocket(network, addr string, reusePort bool) (int, unix.Sockaddr, error) { var sa unix.Sockaddr udpAddr, err := net.ResolveUDPAddr(network, addr) if err != nil { return 0, nil, fmt.Errorf("resolve addr err: %v", err) } netFamily := unix.AF_INET if udpAddr.IP.To4() == nil { netFamily = unix.AF_INET6 } // listen socket fd syscall.ForkLock.Lock() fd, err := unix.Socket(netFamily, unix.SOCK_DGRAM|unix.SOCK_NONBLOCK|unix.SOCK_CLOEXEC, unix.IPPROTO_UDP) if err == nil { unix.CloseOnExec(fd) } syscall.ForkLock.Unlock() defer func() { if err != nil { _ = unix.Close(fd) } }() switch network { case "udp": sockaddr := &unix.SockaddrInet4{} sockaddr.Port = udpAddr.Port sa = sockaddr case "udp4": sockaddr := &unix.SockaddrInet4{} sockaddr.Port = udpAddr.Port copy(sockaddr.Addr[:], udpAddr.IP.To4()) sa = sockaddr case "udp6": // IPv6 zone sockaddr := &unix.SockaddrInet6{} copy(sockaddr.Addr[:], udpAddr.IP.To16()) if udpAddr.Zone != "" { var iface *net.Interface iface, err = net.InterfaceByName(udpAddr.Zone) if err != nil { return 0, nil, fmt.Errorf("parse UDPAddr.Zone err: %v", err) } sockaddr.ZoneId = uint32(iface.Index) } sockaddr.Port = udpAddr.Port netFamily = unix.AF_INET6 default: return 0, nil, fmt.Errorf("not support network") } if reusePort { if err = os.NewSyscallError("setsockopt", unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)); err != nil { return 0, nil, err } } if err = os.NewSyscallError("bind", unix.Bind(fd, sa)); err != nil { return 0, nil, err } return fd, sa, nil }

使用recvmmsg代替recvmsg

我们知道在调用recvmsg时会将收到的数据从内核空间拷贝至用户空间,每调用一次就会产生一次内核开销, 短时间内接收超大量的数据包累积起来的内核开销也很可观了,所以从linux 2.6.33开始,新增了 recvmmsg ,允许用户一次性接收多个数据包,对于 recvmmsg 的说明可以参考:recvmmsg document,这里主要说下如何在Go中调用 recvmmsg。 recvmmsg 依赖以下几个结构:

#include <sys/socket.h> struct mmsghdr { struct msghdr msg_hdr; /* Message header */ unsigned int msg_len; /* Number of received bytes for header */ }; struct iovec { /* Scatter/gather array items */ void *iov_base; /* Starting address */ size_t iov_len; /* Number of bytes to transfer */ }; struct msghdr { void *msg_name; /* Optional address */ socklen_t msg_namelen; /* Size of address */ struct iovec *msg_iov; /* Scatter/gather array */ size_t msg_iovlen; /* # elements in msg_iov */ void *msg_control; /* Ancillary data, see below */ size_t msg_controllen; /* Ancillary data buffer len */ int msg_flags; /* Flags on received message */ };

带外数据不在本文考虑范围内,因此 msg_control 和 msg_controllen 可以忽略,其中 iovec 为接收数据缓冲 区,本质上 recvmmsg 的传入参数就是mmsghdr数组,数据长度即为期望收到多少个数据包,理解这个结构之 后就好办了,我们可以构建在Go中构建对应的数据结构通过 unix.syscall6 实现调用 recvmmsg 。调用 recvmmsg 之前

func prepare(n, mtu int) ([]mmsghdr, [][]byte, [][]byte) { mms := make([]mmsghdr, n) buffers := make([][]byte, n) names := make([][]byte, n) for i := range mms { buffers[i] = make([]byte, mtu) names[i] = make([]byte, sizeofSockaddrInet6) v := []iovec{ {Base: (*byte)(unsafe.Pointer(&buffers[i][0])), Len: uint64(len(buffers[i]))}, } mms[i].Hdr.Iov = &v[0] mms[i].Hdr.Iovlen = uint64(len(v)) mms[i].Hdr.Name = (*byte)(unsafe.Pointer(&names[i][0])) mms[i].Hdr.Namelen = uint32(len(names[i])) // ignore mms[i].Hdr.Control and mms[i].Hdr.Controllen } return mms, buffers, names }

调用 recvmmsg :

func (rw *ReaderWriter) read() (int, error) { n, _, err := unix.Syscall6(unix.SYS_RECVMMSG, uintptr(rw.fd), uintptr(unsafe.Pointer(&rw.msgs[0])), uintptr(len(rw.msgs)), unix.MSG_WAITFORONE, 0, 0, ) if err != 0 { if err == unix.EAGAIN || err == unix.EWOULDBLOCK { return 0, nil } return 0, os.NewSyscallError("recvmmsg", fmt.Errorf("%v", unix.ErrnoName(err))) } return int(n), nil }

至于 mmsghr 结构,可以参考:golang.org/x/net/internal/socket/zsys_linux_amd64.go 。至于从 mmsghdr 结构中解析到远端地址和数据,可翻阅linux文档。

网卡多队列绑定CPU核心优化

我们知道,如果一个socket的所有操作都固定在某个CPU核心上是能获得一定的性能提升,如果网卡支持多队 列可以尝试这样一种优化方案:将网卡多队列均匀绑定到CPU多核心上,同时设置 SO_INCOMING_CPU 属性,将 socket的处理与某个CPU核心绑定,同时逻辑线程与某个CPU核心进行亲和性绑定,最终的结果是:某个逻辑线 程上总是处理特定的socket操作,简单来说就是路宽了,每条路上都井然有序,拥挤程度降级,性能得到提 升。那么在Go中能否实现这种优化方案?很可惜我没有找到明确的方法实施这种方案,主要原因是Go刻意弱化了线 程概念和操作,在Go中无法直接设置线程和CPU核心的亲和性以实现上述目的,有线索的同学可以指点一下。即便如此,但将网络多队列均匀到CPU多核心上是具有意义的。在实际测试中发现,偶尔会出现吞吐量下降, 重现率不高,偶然发现是某个CPU核心压力过高,查了网卡队列数据流向之后发现某些核心比较繁忙,开启网 卡多队列绑定到各个CPU核心上之后再次测试各个核心的压力都比较均匀,不至于会出现某个核心压力过高影 响runtime调度。关于这部分内容,这篇文章说的比较好可以作为优化参考:TCP加速技术解决方案

考虑CPU Cache

在我们的代码结构上实现了类似事件循环(event-loop),在这个event-loop中调用 epoll_wait ;前面我们说 过 epoll_wait 的事件是通过回调函数回调到event-loop中,也就是说会event-loop会被频繁的读写访问,此时 就有可能会出现event-loop在CPU中的访问命中率下降,其原理:单个CPU核在读取一个变量时,以cache line 的方式将后续的变量也读取进来,缓存在自己这个核的cache中,而后续的变量也可能被其他CPU核并行缓存。当前面的CPU对前面的变量进行写入时,该变量同样是以cache line为单位写回内存。此时在其他核上,尽管缓 存的是该变量之后的变量,但是由于没法区分自身变量是否被修改,所以它只能认为自己的缓存失效,重新从 内存中读取。为了能够让CPU尽快从高速缓冲中访问到event-loop变量,有必要让event-loop结构恰好填满一个cache line, 避免重复写回,至于手段上比较简单,即在结构中按照cache line大小填充无意义数组变量。

系统调用分离

在压测过程中,我们发现当PPS达到70w/s之后数据再也上不去了,通过pprof看到是系统调用开销,我们所涉 及到的几个系统调用均为阻塞的,阻塞调用在一定程度上会影响吞吐量,解决办法是再独立出goroutine专门负 责系统调用,避免阻塞event-loop。通过上述的优化内容,我实现了最简代码,在12核(E5-2640 2.5GHz)24G机器上跑出了128w PPS的数据,还 有一些比较细化的优化点,如降低GC频率在此不表,做此类优化的难点在于扣细节,结合pprof和实际测试数 据逐点分析哪部分可能会影响吞吐量,哪种优化方案能有效应对,需要反复对比测试数据,有时候还需要考虑 到代码结构上的实现,文中内容略浅显,表述不当的地方请指正。

本文测试代码在这里:https://github.com/shaoyuan1943/fastudp

作者:shaoyuan1943

来源:GoCN

,