辅助功能:报头发送以及回复
服务端 lib 文件夹下 main.py
def get_header(self):
"""获取报头内容"""
# 解码获取报头长度
header_bytes_size = struct.unpack("q", self.conn.recv(self.HEADER_STRUCT_SIZE))[0]
# 接收报头字符串
header_bytes = self.conn.recv(header_bytes_size)
# 字符串转换为字典
header_dic = json.loads(header_bytes)
return header_dic
def send_response(self, status_code, **kwargs):
"""
服务端返回执行结果
如:要下载的文件大小,状态码等内容,也可以理解为 服务端向客户端发送的报头
"""
# 客户端通过 status_code 判断命令执行结果,如没有找到文件,如果执行和用户预期不同,如没有找到文件,客户端打印 status_msg 信息提示用户
response = {
"status_code": status_code,
"status_msg": ""
}
response["status_msg"] = self.STATUS_CODE.get(status_code)
response.update(**kwargs)
# 字典转换成字符串
response_bytes = json.dumps(response)
# 编码字符串长度并发送
response_bytes_size = struct.pack("q", len(response_bytes))
self.conn.send(response_bytes_size)
# 发送字符串
self.conn.send(response_bytes.encode("utf-8"))
客户端 server 文件夹下 FTPClient.py
def create_header_send(self, action_type, **kwargs):
"""制作报头并发送给服务端,防止粘包"""
# 报头字典
# action_typs 表示发送给服务端后需要执行的函数名字
header_dic = {
"action_type": action_type
}
# 如果调用此函数时,输入的参数有 filename_size = "100", 那么执行 update 方法后,此时 header_dic["file_size"] = "100"
header_dic.update(**kwargs)
# 报头字典转化为 报头字典字符串
header_bytes = json.dumps(header_dic)
# 对字符串长度进行编码
header_bytes_size = struct.pack("q", len(header_bytes))
# 发送编码
self.client.send(header_bytes_size)
# 发送字典字符串
self.client.send(header_bytes.encode("utf-8"))
def get_response(self):
"""获取服务端发送的 response"""
# 解码 response 字符串长度
response_bytes_size = struct.unpack("q", self.client.recv(self.HEADER_STRUCT_SIZE))[0]
# 获取 response 字符串
response_bytes = self.client.recv(response_bytes_size)
# 字符串转换为字典
response = json.loads(response_bytes)
return response
用户认证功能
conf 文件夹下 setting.py 增加部分
# 用户信息文件所在的路径
ACCOUNT_DIR = "%s/conf/accounts.ini" % BASE_DIR
conf 文件夹下 accounts.ini
[aoteman]
username = aoteman
password = b'\xe9\x9a\x18\xc4(\xcb8\xd5\xf2`\x856x\x92.\x03'
服务端 lib 文件夹下 main.py
status_code 增加部分
MSG_SIZE = 8192 # 一次性接收的信息长度
HEADER_STRUCT_SIZE = 8 # 报头字符串编码后的长度默认为 8
# 状态码
STATUS_CODE = {
200: "User login succeeded !", # 用户登陆成功
201: "Error: wrong username or wrong password !", # 错误用户名或密码
202: "Action_type does not exist !" # 没有该函数可以执行
}
def handle(self):
"""处理与客户端的交互指令"""
while True:
header_dic = self.get_header()
action_type = header_dic.get("action_type")
if hasattr(self, "_%s" % action_type): # 如果类中有该函数,执行
func = getattr(self, "_%s" % action_type)
func(header_dic)
else: # 不存在该函数
self.send_response(status_code=202)
def log_msg(self):
"""获取 accounts.ini 文件中的用户信息"""
config_obj = configparser.ConfigParser()
config_obj.read(setting.ACCOUNT_DIR)
# config_obj 结果类似于字典
# key 值为 ini 文件中 【】 部分
print(config_obj.sections())
return config_obj
def authentication_judgment(self, username, password):
"""判断是否存在该用户以及密码是否正确"""
log_msg = self.log_msg() # 获得用户信息
if username in log_msg: # 如果存在该用户
ini_password= log_msg[username]["password"] # ini 文件下的用户经过 md5 加密的密码
# 对用户输入的密码进行 md5 加密并和 ini_password 匹配,判断密码是否正确
# md5 加密
md5_obj = hashlib.md5()
md5_obj.update(password.encode("utf-8"))
md5_password = md5_obj.digest()
# 注意:由于 项目本身并没有创建用户的功能,所以 用户密码 我们可以先对真实密码加密后再写入 ini 文件中
print(md5_password)
if str(md5_password) == str(ini_password): # 如果密码正确
self.current_dir = os.path.join(self.user_home, username)
self.send_response(status_code=200, current_dir = self.current_dir)
else:
self.send_response(status_code=201)
else:
self.send_response(status_code=201)
def _auth(self, header_dic):
"""用户认证"""
username = header_dic.get("username")
password = header_dic.get("password")
self.authentication_judgment(username, password)
客户端 server 文件夹下 FTPClient.py
def auth(self):
"""用户输入用户名,密码发送到服务端,判断是否正确"""
# 三次尝试机会
count = 0
while count < 3:
# 输入用户名,密码
username = input("username: ").strip()
password = input("password: ").strip()
# 发送用户名和密码
self.create_header_send(action_type="auth", username=username, password=password)
# 获取服务端的回复
response = self.get_response()
# 如果登录成功,退出循环
if response.get("status_code") == 200:
self.show_to_client = "[\\%s]>>: " % username
self.current_dir = response.get("current_dir")
return True
else:
print(response.get("status_msg"))
count += 1
return False
def interactive(self):
"""send 交互指令"""
if self.auth(): # 登陆成功
while True:
command = input(self.show_to_client).strip()
if not command:continue
if __name__ == '__main__':
client = Client()
client.interactive()