嘘~ 正在从服务器偷取页面 . . .

网络通信编程学习(8)/ FTP项目(2) —— 项目辅助功能以及用户认证功能


辅助功能:报头发送以及回复

服务端 lib 文件夹下 main.py

    def get_header(self):
        """获取报头内容"""
        # 解码获取报头长度
        header_bytes_size = struct.unpack("q", self.conn.recv(self.HEADER_STRUCT_SIZE))[0]

        # 接收报头字符串
        header_bytes = self.conn.recv(header_bytes_size)

        # 字符串转换为字典
        header_dic = json.loads(header_bytes)

        return header_dic

    def send_response(self, status_code, **kwargs):
        """
        服务端返回执行结果
            如:要下载的文件大小,状态码等内容,也可以理解为 服务端向客户端发送的报头
        """
        # 客户端通过 status_code 判断命令执行结果,如没有找到文件,如果执行和用户预期不同,如没有找到文件,客户端打印 status_msg 信息提示用户
        response = {
            "status_code": status_code,
            "status_msg": ""
        }
        response["status_msg"] = self.STATUS_CODE.get(status_code)
        response.update(**kwargs)

        # 字典转换成字符串
        response_bytes = json.dumps(response)

        # 编码字符串长度并发送
        response_bytes_size = struct.pack("q", len(response_bytes))
        self.conn.send(response_bytes_size)

        # 发送字符串
        self.conn.send(response_bytes.encode("utf-8"))

客户端 server 文件夹下 FTPClient.py

    def create_header_send(self, action_type, **kwargs):
        """制作报头并发送给服务端,防止粘包"""
        # 报头字典
        # action_typs 表示发送给服务端后需要执行的函数名字
        header_dic = {
            "action_type": action_type
        }
        # 如果调用此函数时,输入的参数有 filename_size = "100", 那么执行 update 方法后,此时 header_dic["file_size"] = "100"
        header_dic.update(**kwargs)

        # 报头字典转化为 报头字典字符串
        header_bytes = json.dumps(header_dic)

        # 对字符串长度进行编码
        header_bytes_size = struct.pack("q", len(header_bytes))

        # 发送编码
        self.client.send(header_bytes_size)

        # 发送字典字符串
        self.client.send(header_bytes.encode("utf-8"))

    def get_response(self):
        """获取服务端发送的 response"""
        # 解码 response 字符串长度
        response_bytes_size = struct.unpack("q", self.client.recv(self.HEADER_STRUCT_SIZE))[0]

        # 获取 response 字符串
        response_bytes = self.client.recv(response_bytes_size)

        # 字符串转换为字典
        response = json.loads(response_bytes)

        return response

用户认证功能

conf 文件夹下 setting.py 增加部分

# 用户信息文件所在的路径
ACCOUNT_DIR = "%s/conf/accounts.ini" % BASE_DIR

conf 文件夹下 accounts.ini



[aoteman]
username = aoteman
password = b'\xe9\x9a\x18\xc4(\xcb8\xd5\xf2`\x856x\x92.\x03'

服务端 lib 文件夹下 main.py

status_code 增加部分

    MSG_SIZE = 8192  # 一次性接收的信息长度

    HEADER_STRUCT_SIZE = 8  # 报头字符串编码后的长度默认为 8

    # 状态码
    STATUS_CODE = {
        200: "User login succeeded !", # 用户登陆成功
        201: "Error: wrong username or wrong password !", # 错误用户名或密码
        202: "Action_type does not exist !" # 没有该函数可以执行
    }
    def handle(self):
        """处理与客户端的交互指令"""
        while True:
            header_dic = self.get_header()
            action_type = header_dic.get("action_type")

            if hasattr(self, "_%s" % action_type): # 如果类中有该函数,执行
                func = getattr(self, "_%s" % action_type)
                func(header_dic)
            else: # 不存在该函数
                self.send_response(status_code=202)

    def log_msg(self):
        """获取 accounts.ini 文件中的用户信息"""
        config_obj = configparser.ConfigParser()
        config_obj.read(setting.ACCOUNT_DIR)

        # config_obj 结果类似于字典
        # key 值为 ini 文件中 【】 部分
        print(config_obj.sections())
        return config_obj

    def authentication_judgment(self, username, password):
        """判断是否存在该用户以及密码是否正确"""
        log_msg = self.log_msg() # 获得用户信息
        if username in log_msg: # 如果存在该用户
            ini_password= log_msg[username]["password"] # ini 文件下的用户经过 md5 加密的密码

            # 对用户输入的密码进行 md5 加密并和 ini_password 匹配,判断密码是否正确
            # md5 加密
            md5_obj = hashlib.md5()
            md5_obj.update(password.encode("utf-8"))
            md5_password = md5_obj.digest()
            # 注意:由于 项目本身并没有创建用户的功能,所以 用户密码 我们可以先对真实密码加密后再写入 ini 文件中
            print(md5_password)

            if str(md5_password) == str(ini_password): # 如果密码正确
                self.current_dir = os.path.join(self.user_home, username)
                self.send_response(status_code=200, current_dir = self.current_dir)
            else:
                self.send_response(status_code=201)
        else:
            self.send_response(status_code=201)

    def _auth(self, header_dic):
        """用户认证"""
        username = header_dic.get("username")
        password = header_dic.get("password")

        self.authentication_judgment(username, password)

客户端 server 文件夹下 FTPClient.py

    def auth(self):
        """用户输入用户名,密码发送到服务端,判断是否正确"""
        # 三次尝试机会
        count = 0
        while count < 3:
            # 输入用户名,密码
            username = input("username: ").strip()
            password = input("password: ").strip()
            # 发送用户名和密码
            self.create_header_send(action_type="auth", username=username, password=password)

            # 获取服务端的回复
            response = self.get_response()

            # 如果登录成功,退出循环
            if response.get("status_code") == 200:
                self.show_to_client = "[\\%s]>>: " % username
                self.current_dir = response.get("current_dir")
                return True
            else:
                print(response.get("status_msg"))

            count += 1
        return False

    def interactive(self):
        """send 交互指令"""
        if self.auth(): # 登陆成功
            while True:
                command = input(self.show_to_client).strip()
                if not command:continue

if __name__ == '__main__':
    client = Client()
    client.interactive()

运行结果

服务端

客户端


文章作者: New Ass
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 New Ass !
  目录