阅读量:0
开发自己的 Web 框架
接收web服务器的动态资源请求,给web服务器提供处理动态资源请求的服务。根据请求资源路径的后缀名进行判断
- 如果请求资源路径的后缀名是.html则是动态资源请求,让web框架程序进行处理。
- 否则是静态资源请求,让web服务器程序进行处理。
开发Web服务器主体程序
- 接受客户端 HTTP 请求(底层是 TCP)
- 判断请求是否是静态资源还是动态资源
- 如果是静态资源怎么处理
- 如果是动态资源怎么处理
- 关闭 Web 服务器
""" #!/usr/bin/python3 # coding:utf-8 # # Copyright (C) 2024 - 2024 Jasonakeke, Inc. All Rights Reserved # @Desc :我们自己开发的 web 服务器 # @Time : 2024/7/31 22:17 # @Author : Code_By_Jasonakeke # @Email : 2284037977@qq.com # @File : my_web.py # @IDE : PyCharm """ from socket import socket, AF_INET, SOCK_STREAM from threading import Thread from time import strftime, localtime from _socket import SOL_SOCKET, SO_REUSEADDR from MyFramework import handle_request # 开发自己的 web 服务器主类 class MyWebHttpServer(object): def __init__(self, port): # 创建 HTTP 服务器的套接字 server_socket = socket(AF_INET, SOCK_STREAM) # 设置端口号复用,出现退出之后,不需要等待几分钟,直接释放端口 server_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, True) server_socket.bind(('', port)) server_socket.listen(128) self.server_socket = server_socket # 处理请求的函数 @staticmethod def hande_browser_request(new_socket): # 接收客户端的请求 recv_data = new_socket.recv(4096) # 如果没有收到数据,那么请求无效,关闭套接字,直接退出 if len(recv_data) == 0: new_socket.close() return # 对接收的字节数据,转换成字符 request_data = recv_data.decode('utf-8') print("浏览器请求的数据:", request_data) request_array = request_data.split(' ', maxsplit = 2) # 得到请求路径 request_path = request_array[1] print("请求路径是:", request_path) # 如果请求路径是 /,自动设置为 /index.html if request_path == '/': request_path = '/index.html' # 根据请求路径来判断是否是动态资源函数静态资源 if request_path.endswith('.html'): ''' 动态资源的请求 ''' # 动态资源的处理交给 Web 框架来处理,需要把请求参数传给 Web 框架,可能会有多个参数,所以采用字典结果 params = { 'request_path': request_path } # Web 框架处理动态资源请求之后,返回一个响应 response = handle_request(params) new_socket.send(response) new_socket.close() else: ''' 静态资源的请求 ''' # 响应主体页面 response_body = None # 响应头 response_header = None response_type = 'text/html' # 响应头第一行 response_first_line = None # 其实就是根据请求路径读取 /static/ 目录中静态的文件数据,响应给客户端 try: # 读取 static 目录中的文件数据,rb 模式是一种兼容模式,可以打开图片,也可以打开 js with open('static' + request_path, 'rb') as f: response_body = f.read() if request_path.endswith('.jpg'): response_type = 'image/wbep' response_first_line = 'HTTP/1.1 200 OK\r\n' # 响应头 response_header = ('Content-Length: ' + str(len(response_body)) + '\r\n' + 'Content-Type: ' + response_type + '; charset=iso-8859-1\r\n' + 'Date: ' + strftime('%Y-%m-%d %H:%M:%S', localtime()) + '\r\n' + 'Server: keke\r\n') except Exception as e: # 浏览器想读取的文件可能不存在 with open('static/404.html', 'rb') as f: # 响应主体页面 response_body = f.read() response_first_line = 'HTTP/1.1 404 Not Found\r\n' # 响应头 response_header = ('Content-Length: ' + str(len(response_body)) + '\r\n' + 'Content-Type: ' + response_type + '; charset=iso-8859-1\r\n' + 'Date: ' + strftime('%Y-%m-%d %H:%M:%S', localtime()) + '\r\n' + 'Server: keke\r\n') finally: # 组成响应数据,发给客户端 response = (response_first_line + response_header + '\r\n').encode('utf-8') + response_body new_socket.send(response) # 关闭套接字 new_socket.close() # 启动服务器 ,并且接收客户端的请求 def start(self): # 循环并且多线程来接收客户端的请求 while True: new_socket, ip_port = self.server_socket.accept() print("客户端的 ip 和端口是:", ip_port) # 一个客户端请求交给一个线程处理 sub_thread = Thread(target = self.hande_browser_request, args = (new_socket,)) # 设置守护线程 sub_thread.daemon = True sub_thread.start() def main(): # 创建服务器对象 web_server = MyWebHttpServer(8080) # 启动服务器 web_server.start() if __name__ == '__main__': main()
开发Web框架程序
- 根据请求路径,动态响应对应的数据
- 如果请求路径,没有对应的响应数据也需要返回404页面
""" # coding:utf-8 # # Copyright (C) 2024 - 2024 Jasonakeke, Inc. All Rights Reserved # @Desc :自定义 Web 框架 # @Time : 2024/8/1 22:07 # @Author : Code_By_Jasonakeke # @Email : 2284037977@qq.com # @File : MyFramework.py # @IDE : PyCharm """ from time import strftime, localtime # 处理动态资源请求 def handle_request(params): request_path = params['request_path'] # 当前的请求路径有与之对应的动态响应 if request_path == '/index.html': response = index() return response else: # 没有动态资源的数据,返回 404页面 return page_not_found() # 专门处理 index.html 的请求 def index(): data = strftime('%Y-%m-%d %H:%M:%S', localtime()) response_body = data response_first_line = 'HTTP/1.1 200 OK\r\n' # 响应头 response_header = ('Content-Length: ' + str(len(response_body)) + '\r\n' + 'Content-Type: text/html; charset=iso-8859-1\r\n' + 'Date: ' + strftime('%Y-%m-%d %H:%M:%S', localtime()) + '\r\n' + 'Server: keke\r\n') response = (response_first_line + response_header + '\r\n' + response_body).encode('utf-8') return response # 处理没有找到对应的动态资源 def page_not_found(): # 浏览器想读取的文件可能不存在 with open('static/404.html', 'rb') as f: # 响应主体页面 response_body = f.read() response_first_line = 'HTTP/1.1 404 Not Found\r\n' # 响应头 response_header = ('Content-Length: ' + str(len(response_body)) + '\r\n' + 'Content-Type: text/html; charset=iso-8859-1\r\n' + 'Date: ' + strftime('%Y-%m-%d %H:%M:%S', localtime()) + '\r\n' + 'Server: keke\r\n') response = (response_first_line + response_header + '\r\n').encode('utf-8') + response_body return response
使用模板来展示响应内容
- 自己设计一个模板,中有一些地方采用动态的数据替代
- 怎么替代,替代什么数据
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <title>首页 - 电影列表</title> <link href="/css/bootstrap.min.css" rel="stylesheet"> <script src="/js/jquery-1.12.4.min.js"></script> <script src="/js/bootstrap.min.js"></script> </head> <body> <div class="navbar navbar-inverse navbar-static-top "> <div class="container"> <div class="navbar-header"> <button class="navbar-toggle" data-toggle="collapse" data-target="#mymenu"> <span class="icon-bar"></span> <span class="icon-bar"></span> <span class="icon-bar"></span> </button> <a href="#" class="navbar-brand">电影列表</a> </div> <div class="collapse navbar-collapse" id="mymenu"> <ul class="nav navbar-nav"> <li class="active"><a href="">电影信息</a></li> <li><a href="">个人中心</a></li> </ul> </div> </div> </div> <div class="container"> <div class="container-fluid"> <table class="table table-hover"> <tr> <th>序号</th> <th>名称</th> <th>导演</th> <th>上映时间</th> <th>票房</th> <th>电影时长</th> <th>类型</th> <th>备注</th> <th>删除电影</th> </tr> {%datas%} </table> </div> </div> </body> </html>
开发框架的路由列表功能
- 以后开发新的动作资源的功能只需要增加一个条件判断分支和一个专门处理的函数
- 路由:就是请求的 URL 路径和处理函数直接的映射
- 路由表
请求路径 处理函数 /index.html index /userinfo.html user_info
注意:用户的动态资源请求通过遍历路由表找到对应的处理函数来完成的
""" #!/usr/bin/python3 # coding:utf-8 # # Copyright (C) 2024 - 2024 Jasonakeke, Inc. All Rights Reserved # @Desc :自定义 Web 框架 # @Time : 2024/8/1 22:07 # @Author : Code_By_Jasonakeke # @Email : 2284037977@qq.com # @File : MyFramework.py # @IDE : PyCharm """ from time import strftime, localtime # 处理动态资源请求 def handle_request(params): request_path = params['request_path'] for path, func in route_list: if request_path == path: return func() else: return page_not_found() # 当前的请求路径有与之对应的动态响应 # if request_path == '/index.html': # response = index() # return response # elif request_path == '/userinfo.html': # response = user_info() # return response # else: # return page_not_found() # 专门处理 index.html 的请求 def index(): response_body = None date = strftime('%Y-%m-%d %H:%M:%S', localtime()) # response_body = data with open('template/index.html', 'r', encoding = 'utf-8') as f: response_body = f.read() response_body = response_body.replace('{%datas%}', date) response_first_line = 'HTTP/1.1 200 OK\r\n' # 响应头 response_header = 'Server: keke\r\n' response = (response_first_line + response_header + '\r\n' + response_body).encode('utf-8') return response # 处理没有找到对应的动态资源 def page_not_found(): # 浏览器想读取的文件可能不存在 with open('static/404.html', 'rb') as f: # 响应主体页面 response_body = f.read() response_first_line = 'HTTP/1.1 404 Not Found\r\n' # 响应头 response_header = 'Server: keke\r\n' response = (response_first_line + response_header + '\r\n').encode('utf-8') + response_body return response def user_info(): response_body = None date = strftime('%Y-%m-%d %H:%M:%S', localtime()) # response_body = data with open('template/user_info.html', 'r', encoding = 'utf-8') as f: response_body = f.read() response_body = response_body.replace('{%datas%}', date) response_first_line = 'HTTP/1.1 200 OK\r\n' # 响应头 response_header = 'Server: keke\r\n' response = (response_first_line + response_header + '\r\n' + response_body).encode('utf-8') return response # 定义路由表 route_list = { ('/index.html', index), ('/userinfo.html', user_info), }
采用装饰器的方式添加路由
- 采用带参数的装饰器
- 在任何一个处理函数的基础上增加一个添加路由的功能
""" #!/usr/bin/python3 # coding:utf-8 # # Copyright (C) 2024 - 2024 Jasonakeke, Inc. All Rights Reserved # @Desc :自定义 Web 框架 # @Time : 2024/8/1 22:07 # @Author : Code_By_Jasonakeke # @Email : 2284037977@qq.com # @File : MyFramework.py # @IDE : PyCharm """ from functools import wraps from time import strftime, localtime # 定义路由表 route_list = [] # route_list = { # ('/index.html', index), # ('/userinfo.html', user_info), # } # 调用一个带参数的装饰器 def route(request_path): """ :param request_path: URL 请求 :return: """ def add_route(func): # 添加路由到路由表 route_list.append((request_path, func)) @wraps(func) def invoke(*arg, **kwargs): # 调用指定的处理函数并返回 return func(*arg, **kwargs) return invoke return add_route # 处理动态资源请求 def handle_request(params): request_path = params['request_path'] for path, func in route_list: if request_path == path: return func() else: return page_not_found() # 当前的请求路径有与之对应的动态响应 # if request_path == '/index.html': # response = index() # return response # elif request_path == '/userinfo.html': # response = user_info() # return response # else: # return page_not_found() # 专门处理 index.html 的请求 @route('/index.html') def index(): response_body = None date = strftime('%Y-%m-%d %H:%M:%S', localtime()) # response_body = data with open('template/index.html', 'r', encoding = 'utf-8') as f: response_body = f.read() response_body = response_body.replace('{%datas%}', date) response_first_line = 'HTTP/1.1 200 OK\r\n' # 响应头 response_header = 'Server: keke\r\n' response = (response_first_line + response_header + '\r\n' + response_body).encode('utf-8') return response # 处理没有找到对应的动态资源 def page_not_found(): # 浏览器想读取的文件可能不存在 with open('static/404.html', 'rb') as f: # 响应主体页面 response_body = f.read() response_first_line = 'HTTP/1.1 404 Not Found\r\n' # 响应头 response_header = 'Server: keke\r\n' response = (response_first_line + response_header + '\r\n').encode('utf-8') + response_body return response @route('/userinfo.html') def user_info(): response_body = None date = strftime('%Y-%m-%d %H:%M:%S', localtime()) # response_body = data with open('template/user_info.html', 'r', encoding = 'utf-8') as f: response_body = f.read() response_body = response_body.replace('{%datas%}', date) response_first_line = 'HTTP/1.1 200 OK\r\n' # 响应头 response_header = 'Server: keke\r\n' response = (response_first_line + response_header + '\r\n' + response_body).encode('utf-8') return response
小结:使用带参数的装饰器,可以把路由自动的添加到路由列表中
电影列表页面的开发案例
- 查询数据
- 根据查询的数据得到动态的内容
""" #!/usr/bin/python3 # coding:utf-8 # # Copyright (C) 2024 - 2024 Jasonakeke, Inc. All Rights Reserved # @Desc :自定义 Web 框架 # @Time : 2024/8/1 22:07 # @Author : Code_By_Jasonakeke # @Email : 2284037977@qq.com # @File : MyFramework.py # @IDE : PyCharm """ from functools import wraps from time import strftime, localtime from pymysql import connect # 定义路由表 route_list = [] # route_list = { # ('/index.html', index), # ('/userinfo.html', user_info), # } # 调用一个带参数的装饰器 def route(request_path): """ :param request_path: URL 请求 :return: """ def add_route(func): # 添加路由到路由表 route_list.append((request_path, func)) @wraps(func) def invoke(*arg, **kwargs): # 调用指定的处理函数并返回 return func(*arg, **kwargs) return invoke return add_route # 处理动态资源请求 def handle_request(params): request_path = params['request_path'] for path, func in route_list: if request_path == path: return func() else: return page_not_found() # 当前的请求路径有与之对应的动态响应 # if request_path == '/index.html': # response = index() # return response # elif request_path == '/userinfo.html': # response = user_info() # return response # else: # return page_not_found() # 专门处理 index.html 的请求 @route('/index.html') def index(): response_body = None # date = strftime('%Y-%m-%d %H:%M:%S', localtime()) # response_body = data # 1.从 MySQL 中查询数据 conn = connect(host = '127.0.0.1', port = 3306, user = 'root', password = '123456', database = 'test', charset = 'utf8') cursor = conn.cursor() cursor.execute('select * from t_movies') result = cursor.fetchall() print(result) datas = '' for row in result: datas += ''' <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s元</td> <td>%s</td> <td>%s</td> <td>%s</td> <td> <input type='button' value='删除'/></td> </tr> ''' % row print(datas) with open('template/index.html', 'r', encoding = 'utf-8') as f: response_body = f.read() response_body = response_body.replace('{%datas%}', datas) response_first_line = 'HTTP/1.1 200 OK\r\n' # 响应头 response_header = 'Server: keke\r\n' response = (response_first_line + response_header + '\r\n' + response_body).encode('utf-8') return response # 处理没有找到对应的动态资源 def page_not_found(): # 浏览器想读取的文件可能不存在 with open('static/404.html', 'rb') as f: # 响应主体页面 response_body = f.read() response_first_line = 'HTTP/1.1 404 Not Found\r\n' # 响应头 response_header = 'Server: keke\r\n' response = (response_first_line + response_header + '\r\n').encode('utf-8') + response_body return response @route('/userinfo.html') def user_info(): response_body = None date = strftime('%Y-%m-%d %H:%M:%S', localtime()) # response_body = data with open('template/user_info.html', 'r', encoding = 'utf-8') as f: response_body = f.read() response_body = response_body.replace('{%datas%}', date) response_first_line = 'HTTP/1.1 200 OK\r\n' # 响应头 response_header = 'Server: keke\r\n' response = (response_first_line + response_header + '\r\n' + response_body).encode('utf-8') return response