嘘~ 正在从服务器偷取页面 . . .

多线程FTP(2)—— socketserver 部分源码解读


多线程FTP(2)—— socketserver 部分源码解读

前面我们用 socketserver 演示了 sockeserver 版本的多线程网络通信实现。但实际上我们对于 socketserver 现在处于 “知其然不知其所以然” 的情况,比如说,为什么自己写的类中方法一定要是“handle”?为什么我们只用写通信循环,为什么不用写链接循环?

所有的这些问题其实都可以通过分析 socketserver 的源码来回答。

(pycharm可以通过导入 socket server,并且按住 Ctrl+鼠标点击跳转至模块源码)

socketserver 源码中的类

首先我们看到的是一百多行的灰色的注释,这里的注释其实不需要太过于关注,大概就是对 socket server 模块的摘要和概述(突然想到论文,笑)。

唯一需要注意的是注释中这块对于模块中主要类的表示和关系

'''
        +------------+
        | BaseServer |
        +------------+
              |
              v
        +-----------+        +------------------+
        | TCPServer |------->| UnixStreamServer |
        +-----------+        +------------------+
              |
              v
        +-----------+        +--------------------+
        | UDPServer |------->| UnixDatagramServer |
        +-----------+        +--------------------+
'''
这里显示的是五个类之间的继承关系,箭头代表“继承于”的关系。

然后我们看到了的是socketserver 源码中所有类的名称。
'''
__all__ = ["BaseServer", "TCPServer", "UDPServer",
           "ThreadingUDPServer", "ThreadingTCPServer",
           "BaseRequestHandler", "StreamRequestHandler",
           "DatagramRequestHandler", "ThreadingMixIn"]
'''

之后的代码就都是这些类的实现,在看这些源码之前,我们把所有的类收起,看一下这些类之间的关系。

class BaseServer
class TCPServer(BaseServer)
class UDPServer(TCPServer)
class _Threads(list)
class _NoThreads
class ThreadingMixIn
class ThreadingUDPServer(ThreadingMixIn, UDPServer)
class ThreadingTCPServer(ThreadingMixIn, TCPServer)
class BaseRequestHandler
class StreamRequestHandler(BaseRequestHandler)
class _SocketWriter(BufferedIOBase)
class DatagramRequestHandler(BaseRequestHandler)

'''
在源码中我们可以看到 类似于:
if hasattr(socket, 'AF_UNIX')
if hasattr(os, "fork") 的代码语句

需要说明的是
'AF_UNIX' 和 '"fork"' 的方法适用于 UNIX 系统服务端的搭建
'''

这里是对socketserver 源码中的类关系和功能的一个整理(可能图有点小,看不清可以私信我)。

socketserver 实现多线程 TCP 时的内部代码流程

多线程服务端实现代码

import socketserver

class server(socketserver.BaseRequestHandler):
    def handle(self) -> None: # 一定要是 handle
        print(self.request) # 相当于 conn
        print(self.client_address) # 相当于 addr

        while True: # 通信循环
            msg = self.request.recv(1024)
            print(msg.decode("utf-8"), now_time)
            self.request.send(now_time.encode("utf-8"))

s = socketserver.ThreadingTCPServer(("127.0.0.1", 8080), server) # 开启 TCP服务 多线程
s.serve_forever()

源码分析流程图

黑色部分是 初始化对象的分析流程,即 s = socketserver.ThreadingTCPServer((“127.0.0.1”, 8080), server)。

红色部分和绿色部分分别是 s.serve_forever() 分析时的第一步和第二步。

源码分析具体流程

我们按照我们上面写的服务端代码了解源码内容。

首先运行的代码是这行代码 s = socketserver.ThreadingTCPServer(("127.0.0.1", 8080), server)

我们跳转到源码关于 ThreadingTCPServer 的部分,只有这一行

class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

这说明 ThreadingTCPServer 类本身并不接收任何参数,所以按照类继承的原理,接着跳转 ThreadingMixIn 类,但是 ThreadingMixIn 类只有三个方法

def process_request_thread(self, request, client_address)
def process_request(self, request, client_address)
def server_close(self)

这说明 ThreadingMixIn 类在初始实例化时也不需要传入参数

所以我们接着查看父类 TCPServer 的内容,我们发现了 TCPServer 在初始化的代码

    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM
    
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                # 监听连接
                self.server_bind()
                # 监听连接的个数
                self.server_activate()
            except:
                # 关闭连接
                self.server_close()
                raise

我们会发现我们需要传入的参数是

server_address
RequestHandlerClass

和我们的代码对比后就是 server_address = ("127.0.0.1", 8080)
                   RequestHandlerClass = server
    
                    我们也可以打印一下    
                    print(s.server_address)
                    print(s.RequestHandlerClass)

在这里我们也发现了其实 socket 的初始实例化就是在 TCPServer 类中实现的,这里我们就明白了为什么我们在使用 socketserver 时不需要实例化 socket了。

    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM
    
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        ...
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        ...
    def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    
    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        self.socket.close()

同时我们发现 TCPServer 在初始化的时候,也初识化了 BaseServer

BaseServer.__init__(self, server_address, RequestHandlerClass)

所以同时我们跳转至 BaseServer 类

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass # 这句话值得注意,之后要用。之前有说RequestHandlerClass = server,所以在这里 self.RequestHandlerClass = server
        self.__is_shut_down = threading.Event() # 跟多线程有关
        self.__shutdown_request = False

分析完 s = socketserver.ThreadingTCPServer((“127.0.0.1”, 8080), server) 这句代码后,然后是 s.serve_forever 这句代码了。

还是和前面一下的方法,先找 ThreadingTCPServer 类 中是否有这个方法,然后是父类ThreadingMixIn 类和 TCPServer 类是否存在此方法,然后再回溯父类的父类。

最后我们在 BaseServer 类中找到了这个方法

 def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            '''
            XXX: Consider using another file descriptor or connecting to the socket to wake this up instead of polling. Polling reduces our responsiveness to a shutdown request and wastes cpu at all other times.
            '''
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)

                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    # bpo-35017: shutdown() called during select(), exit immediately.
                    if self.__shutdown_request:
                        break
                    if ready:
                        self._handle_request_noblock()

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

很复杂的代码,叫人摸不着头脑,看不懂没关系,我们可以看注释呀

"""
大概是说处理请求
Handle one request at a time until shutdown.

是说处理请求的周期,以及说如果要处理有规律的周期性请求使用另一个线程。
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""

从注释来看,serve_forever 方法是和请求处理、多线程有关的方法。

此外,BaseServer 类serve_forever方法中有一行代码值得注意,是

if ready:
    self._handle_request_noblock()

如果我们跳转到方法 _handle_request_noblock(),也会发现一些很有意思的代码

request, client_address = self.get_request()
....
self.process_request(request, client_address)

这时候我们发现了一个我们在自己定义类中出现过的两个变量 request, client_address,那么这两个变量是什么呢?我们接着跳转到 方法 get_request。( get_request方法是 TCPServer 类中,也挺合理的,毕竟 TCPServer 继承于BaseServer,但还是觉得源码的结构挺牛的)。

然后我们就发现了我们很熟悉的代码

    def get_request(self):
        """Get the request and client address from the socket.

        May be overridden.

        """
        return self.socket.accept() # 熟悉的通信连接

然后我们回到 self.process_request(request, client_address)代码,跳转

    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

再跳转到 finish_request 方法

def finish_request(self, request, client_address):
    """Finish one request by instantiating RequestHandlerClass."""
    self.RequestHandlerClass(request, client_address, self)

这里我们又看到了一个很熟悉的代码 self.RequestHandlerClass,这一行我们又回到了一开始初始化的部分

 RequestHandlerClass = server
 self.RequestHandlerClass = RequestHandlerClass
所以
self.RequestHandlerClass(request, client_address, self)
== server(request, client_address, self)

注意,这里 server(request, client_address, self) 中的 self 是指我们初始实例化的 s,也就是 ThreadingTCPServer类。我们也可以打印一下。

因为在 BaseRequestHandler初始化时,有
def __init__(self, request, client_address, server):
        ...
        self.server = server
        ...
所以我们应该打印的是 self.server

分析到这里,代码就运行到我们自己定义的类的部分了,同时到这里我们基本了解了这两行代码的内部原理,可以说这两行代码完成了多线程和连接循环

s = socketserver.ThreadingTCPServer(("127.0.0.1", 8080), server)
s.serve_forever()

了解了这些,我们就可以再来看看我们自己写的类了,

class server(socketserver.BaseRequestHandler):
    def handle(self) -> None: # 一定要是 handle
        print(self.request) # 相当于 conn
        print(self.client_address) # 相当于 addr

很显然这时候我们需要跳转到 BaseRequestHandler 类

class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
    constructor sets the instance variables request, client_address
    and server, and then calls the handle() method.  To implement a
    specific service, all you need to do is to derive a class which
    defines a handle() method.

    The handle() method can find the request as self.request, the
    client address as self.client_address, and the server (in case it
    needs access to per-server information) as self.server.  Since a
    separate instance is created for each request, the handle() method
    can define other arbitrary instance variables.

    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

这段代码就比较简单了,主要是帮助我们了解了为什么我们一定要定义一个 handle 的方法,说实话我们也可以覆盖初始代码,比如说把 handle 改成其他的名称


文章作者: New Ass
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 New Ass !
  目录