多线程FTP(2)—— socketserver 部分源码解读
前面我们用 socketserver 演示了 sockeserver 版本的多线程网络通信实现。但实际上我们对于 socketserver 现在处于 “知其然不知其所以然” 的情况,比如说,为什么自己写的类中方法一定要是“handle”?为什么我们只用写通信循环,为什么不用写链接循环?
所有的这些问题其实都可以通过分析 socketserver 的源码来回答。
(pycharm可以通过导入 socket server,并且按住 Ctrl+鼠标点击跳转至模块源码)
socketserver 源码中的类
首先我们看到的是一百多行的灰色的注释,这里的注释其实不需要太过于关注,大概就是对 socket server 模块的摘要和概述(突然想到论文,笑)。
唯一需要注意的是注释中这块对于模块中主要类的表示和关系:
'''
+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+
'''
这里显示的是五个类之间的继承关系,箭头代表“继承于”的关系。
然后我们看到了的是socketserver 源码中所有类的名称。
'''
__all__ = ["BaseServer", "TCPServer", "UDPServer",
"ThreadingUDPServer", "ThreadingTCPServer",
"BaseRequestHandler", "StreamRequestHandler",
"DatagramRequestHandler", "ThreadingMixIn"]
'''
之后的代码就都是这些类的实现,在看这些源码之前,我们把所有的类收起,看一下这些类之间的关系。
class BaseServer
class TCPServer(BaseServer)
class UDPServer(TCPServer)
class _Threads(list)
class _NoThreads
class ThreadingMixIn
class ThreadingUDPServer(ThreadingMixIn, UDPServer)
class ThreadingTCPServer(ThreadingMixIn, TCPServer)
class BaseRequestHandler
class StreamRequestHandler(BaseRequestHandler)
class _SocketWriter(BufferedIOBase)
class DatagramRequestHandler(BaseRequestHandler)
'''
在源码中我们可以看到 类似于:
if hasattr(socket, 'AF_UNIX')
if hasattr(os, "fork") 的代码语句
需要说明的是
'AF_UNIX' 和 '"fork"' 的方法适用于 UNIX 系统服务端的搭建
'''
这里是对socketserver 源码中的类关系和功能的一个整理(可能图有点小,看不清可以私信我)。
socketserver 实现多线程 TCP 时的内部代码流程
多线程服务端实现代码
import socketserver
class server(socketserver.BaseRequestHandler):
def handle(self) -> None: # 一定要是 handle
print(self.request) # 相当于 conn
print(self.client_address) # 相当于 addr
while True: # 通信循环
msg = self.request.recv(1024)
print(msg.decode("utf-8"), now_time)
self.request.send(now_time.encode("utf-8"))
s = socketserver.ThreadingTCPServer(("127.0.0.1", 8080), server) # 开启 TCP服务 多线程
s.serve_forever()
源码分析流程图
黑色部分是 初始化对象的分析流程,即 s = socketserver.ThreadingTCPServer((“127.0.0.1”, 8080), server)。
红色部分和绿色部分分别是 s.serve_forever() 分析时的第一步和第二步。
源码分析具体流程
我们按照我们上面写的服务端代码了解源码内容。
首先运行的代码是这行代码 s = socketserver.ThreadingTCPServer(("127.0.0.1", 8080), server)
我们跳转到源码关于 ThreadingTCPServer 的部分,只有这一行
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
这说明 ThreadingTCPServer 类本身并不接收任何参数,所以按照类继承的原理,接着跳转 ThreadingMixIn 类,但是 ThreadingMixIn 类只有三个方法
def process_request_thread(self, request, client_address)
def process_request(self, request, client_address)
def server_close(self)
这说明 ThreadingMixIn 类在初始实例化时也不需要传入参数
所以我们接着查看父类 TCPServer 的内容,我们发现了 TCPServer 在初始化的代码
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
# 监听连接
self.server_bind()
# 监听连接的个数
self.server_activate()
except:
# 关闭连接
self.server_close()
raise
我们会发现我们需要传入的参数是
server_address
RequestHandlerClass
和我们的代码对比后就是 server_address = ("127.0.0.1", 8080)
RequestHandlerClass = server
我们也可以打印一下
print(s.server_address)
print(s.RequestHandlerClass)
在这里我们也发现了其实 socket 的初始实例化就是在 TCPServer 类中实现的,这里我们就明白了为什么我们在使用 socketserver 时不需要实例化 socket了。
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
...
self.socket = socket.socket(self.address_family,
self.socket_type)
...
def server_bind(self):
"""Called by constructor to bind the socket.
May be overridden.
"""
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
self.socket.listen(self.request_queue_size)
def server_close(self):
"""Called to clean-up the server.
May be overridden.
"""
self.socket.close()
同时我们发现 TCPServer 在初始化的时候,也初识化了 BaseServer
BaseServer.__init__(self, server_address, RequestHandlerClass)
所以同时我们跳转至 BaseServer 类
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass # 这句话值得注意,之后要用。之前有说RequestHandlerClass = server,所以在这里 self.RequestHandlerClass = server
self.__is_shut_down = threading.Event() # 跟多线程有关
self.__shutdown_request = False
分析完 s = socketserver.ThreadingTCPServer((“127.0.0.1”, 8080), server) 这句代码后,然后是 s.serve_forever 这句代码了。
还是和前面一下的方法,先找 ThreadingTCPServer 类 中是否有这个方法,然后是父类ThreadingMixIn 类和 TCPServer 类是否存在此方法,然后再回溯父类的父类。
最后我们在 BaseServer 类中找到了这个方法
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
'''
XXX: Consider using another file descriptor or connecting to the socket to wake this up instead of polling. Polling reduces our responsiveness to a shutdown request and wastes cpu at all other times.
'''
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
while not self.__shutdown_request:
ready = selector.select(poll_interval)
# bpo-35017: shutdown() called during select(), exit immediately.
if self.__shutdown_request:
break
if ready:
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
很复杂的代码,叫人摸不着头脑,看不懂没关系,我们可以看注释呀
"""
大概是说处理请求
Handle one request at a time until shutdown.
是说处理请求的周期,以及说如果要处理有规律的周期性请求使用另一个线程。
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
从注释来看,serve_forever 方法是和请求处理、多线程有关的方法。
此外,BaseServer 类serve_forever方法中有一行代码值得注意,是
if ready:
self._handle_request_noblock()
如果我们跳转到方法 _handle_request_noblock(),也会发现一些很有意思的代码
request, client_address = self.get_request()
....
self.process_request(request, client_address)
这时候我们发现了一个我们在自己定义类中出现过的两个变量 request, client_address,那么这两个变量是什么呢?我们接着跳转到 方法 get_request。( get_request方法是 TCPServer 类中,也挺合理的,毕竟 TCPServer 继承于BaseServer,但还是觉得源码的结构挺牛的)。
然后我们就发现了我们很熟悉的代码
def get_request(self):
"""Get the request and client address from the socket.
May be overridden.
"""
return self.socket.accept() # 熟悉的通信连接
然后我们回到 self.process_request(request, client_address)代码,跳转
def process_request(self, request, client_address):
"""Call finish_request.
Overridden by ForkingMixIn and ThreadingMixIn.
"""
self.finish_request(request, client_address)
self.shutdown_request(request)
再跳转到 finish_request 方法
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)
这里我们又看到了一个很熟悉的代码 self.RequestHandlerClass,这一行我们又回到了一开始初始化的部分
RequestHandlerClass = server
self.RequestHandlerClass = RequestHandlerClass
所以
self.RequestHandlerClass(request, client_address, self)
== server(request, client_address, self)
注意,这里 server(request, client_address, self) 中的 self 是指我们初始实例化的 s,也就是 ThreadingTCPServer类。我们也可以打印一下。
因为在 BaseRequestHandler初始化时,有
def __init__(self, request, client_address, server):
...
self.server = server
...
所以我们应该打印的是 self.server
分析到这里,代码就运行到我们自己定义的类的部分了,同时到这里我们基本了解了这两行代码的内部原理,可以说这两行代码完成了多线程和连接循环
s = socketserver.ThreadingTCPServer(("127.0.0.1", 8080), server)
s.serve_forever()
了解了这些,我们就可以再来看看我们自己写的类了,
class server(socketserver.BaseRequestHandler):
def handle(self) -> None: # 一定要是 handle
print(self.request) # 相当于 conn
print(self.client_address) # 相当于 addr
很显然这时候我们需要跳转到 BaseRequestHandler 类
class BaseRequestHandler:
"""Base class for request handler classes.
This class is instantiated for each request to be handled. The
constructor sets the instance variables request, client_address
and server, and then calls the handle() method. To implement a
specific service, all you need to do is to derive a class which
defines a handle() method.
The handle() method can find the request as self.request, the
client address as self.client_address, and the server (in case it
needs access to per-server information) as self.server. Since a
separate instance is created for each request, the handle() method
can define other arbitrary instance variables.
"""
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
这段代码就比较简单了,主要是帮助我们了解了为什么我们一定要定义一个 handle 的方法,说实话我们也可以覆盖初始代码,比如说把 handle 改成其他的名称