(资料图)
# 3 local对象-并发编程中的一个对象,它可以保证多线程并发访问数据安全 -本质原理是:不同的线程,操作的是自己的数据 -不支持协程 # 4 自己定义local,支持线程和协程# 注意点一: try: # 只要解释器没有装greenlet,这句话就会报错 # 一旦装了,有两种情况,使用了协程和没用协程,无论使用不使用,用getcurrent都能拿到协程id号 from greenlet import getcurrent as get_ident except Exception as e: from threading import get_ident# 注意点二:重写类的 __setattr__和__getattr__ 对象.属性 取值 不存在会触发 __getattr__ 对象.属性 设置值 不存在时会触发 __setattr__ # 注意点三:由于重写了__setattr__和__getattr__类内部使用 self.storage 会递归 使用类调用对象的方法,它就是普通函数,有几个值传几个值 object.__setattr__(self, "storage", {}) 等同于:self.storage={} 等价于:setattr(self,"storage", {}) 会递归 # 5 flask是如何实现这个local类的 def __setattr__(self, name, value): ident = self.__ident_func__() storage = self.__storage__ try: storage[ident][name] = value except KeyError: storage[ident] = {name: value} def __getattr__(self, name): try: return self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name) def __getattr__(self, k): ident = get_ident() return self.storage[ident][k] def __setattr__(self, k, v): ident = get_ident() #如果用协程,这就是协程号,如果是线程,这就是线程号 if ident in self.storage: #{"协程id号":{arg:1},"协程id号":{arg:2},"协程id号":{arg:3}} self.storage[ident][k] = v else: self.storage[ident] = {k: v} # 6 偏函数 :提前传值,返回一个对象,后期可以调用这个对象,传入后续的值# 7 请求上下文源码分析(ctx 对象),整个flask的执行流程-一旦请求来了----》会执行 Flask类的对象app()---》触发Flask __call__--->self.wsgi_app(environ, start_response) -Flask类wsgi_app 方法 大约 2417行 def wsgi_app(self, environ, start_response): #1 返回了一个ctx,请求上下文对象,RequestContext 的对象,里面有session,request ctx = self.request_context(environ) try: try: # 2 ctx.push---->RequestContext的push---》382行 # _request_ctx_stack.push(self)--self是ctx---》是全局变量 # 是LocalStack()的对象 ctx.push() # 匹配路由执行视图函数,请求扩展 response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: # noqa: B001 error = sys.exc_info()[1] raise # 把结果返回给wsgi服务器 return response(environ, start_response) finally: if self.should_ignore_error(error): error = None # 把当前放进去的ctx剔除,当次请求结束了 ctx.auto_pop(error)-是LocalStack()的对象 的push ,传入了ctx def push(self, obj): # self._local是 Flask自己定义的兼容线程和协程的Local #self._local中反射 stack,会根据不同线程或协程,返回不同线程的stack #rv是None的 rv = getattr(self._local, "stack", None) if rv is None: # rv=[] # self._local.stack=rv #self._local={"协程id号1":{stack:[]},"协程id号2":{stack:[]}} self._local.stack = rv = [] rv.append(obj) #self._local={"协程id号1":{stack:[ctx,]},"协程id号2":{stack:[]}} return rv 在视图函数中:request = LocalProxy(partial(_lookup_req_object, "request")) print(request.method) # 执行requets对象的 __getattr__ LocalProxy的__getattr__-->核心: return getattr(self._get_current_object(), name) self._get_current_object() 是 ctx中的真正request对象,那method,自如就拿到当次请求的method def _get_current_object(self): if not hasattr(self.__local, "__release_local__"): #object.__setattr__(self, "_LocalProxy__local", local),初始化传入的 # local 是 partial(_lookup_req_object, "request") # # getattr(_lookup_req_object("request"), "method") # getattr(当次请求的reuqest, "method") return self.__local() # self中的 __local,隐藏属性 try: return getattr(self.__local, self.__name__) except AttributeError: raise RuntimeError("no object bound to %s" % self.__name__) def _lookup_req_object(name): # 这里把当前线程下 的ctx取出来了 top = _request_ctx_stack.top if top is None: raise RuntimeError(_request_ctx_err_msg) return getattr(top, name) # 去ctx中反射request,返回的就是当次请求的requets # django flask 同步框架,部署的时候,使用uwsgi部署,uwsgi是进程线程架构,并发量不高# 可以通过uwsgi+gevent,部署成异步程序# Flask框架中的信号基于blinker(安装这个模块),其主要就是让开发者可是在flask请求过程中定制一些用户行为 flask 和django都有#观察者模式,又叫发布-订阅(Publish/Subscribe) 23 种设计模式之一pip3.8 install blinker# 信号:signial 翻译过来的,并发编程中学过 信号量Semaphore# 比如:用户表新增一条记录,就记录一下日志 方案一:在每个增加后,都写一行代码 ---》后期要删除,比较麻烦 方案二:使用信号,写一个函数,绑定内置信号,只要程序执行到这,就会执行这个函数# 内置信号:flask少一些,django多一些request_started = _signals.signal("request-started") # 请求到来前执行request_finished = _signals.signal("request-finished") # 请求结束后执行 before_render_template = _signals.signal("before-render-template") # 模板渲染前执行template_rendered = _signals.signal("template-rendered") # 模板渲染后执行 got_request_exception = _signals.signal("got-request-exception") # 请求执行出现异常时执行 request_tearing_down = _signals.signal("request-tearing-down") # 请求执行完毕后自动执行(无论成功与否)appcontext_tearing_down = _signals.signal("appcontext-tearing-down")# 应用上下文执行完毕后自动执行(无论成功与否) appcontext_pushed = _signals.signal("appcontext-pushed") # 应用上下文push时执行appcontext_popped = _signals.signal("appcontext-popped") # 应用上下文pop时执行message_flashed = _signals.signal("message-flashed") # 调用flask在其中添加数据时,自动触发# 使用内置信号的步骤 1 写一个函数 2 绑定内置信号 3 等待被触发from flask import Flask, render_template,signals,sessionfrom flask.signals import _signalsapp = Flask(__name__)app.debug = Trueapp.secret_key = "SSSSSSSSSSSSS"# 定义信号session_set = _signals.signal("session_set")# 写一个函数def test1(*args,**kwargs): print(args) print(kwargs) print("session设置了")# 绑定自定义的信号session_set.connect(test1)@app.route("/")def hhhh(): session["lqz"] = "lqz" session_set.send("lqz") # 触发信号执行 return "hello world"@app.route("/index")def index(): return render_template("index.html", name="lqz")if __name__ == "__main__": app.run()# django中使用信号https://www.cnblogs.com/liuqingzheng/articles/9803403.htmlModel signals pre_init # django的modal执行其构造方法前,自动触发 post_init # django的modal执行其构造方法后,自动触发 pre_save # django的modal对象保存前,自动触发 post_save # django的modal对象保存后,自动触发 pre_delete # django的modal对象删除前,自动触发 post_delete # django的modal对象删除后,自动触发 m2m_changed # django的modal中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发 class_prepared # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发Management signals pre_migrate # 执行migrate命令前,自动触发 post_migrate # 执行migrate命令后,自动触发Request/response signals request_started # 请求到来前,自动触发 request_finished # 请求结束后,自动触发 got_request_exception # 请求异常后,自动触发Database Wrappers connection_created # 创建数据库连接时,自动触发# django中使用内置信号1 写一个函数 def callBack(*args, **kwargs): print(args) print(kwargs) 2 绑定信号 #方式一 post_save.connect(callBack) # 方式二 from django.db.models.signals import pre_save from django.dispatch import receiver @receiver(pre_save) def my_callback(sender, **kwargs): print("对象创建成功") print(sender) print(kwargs) 3 等待触发# django中,有命令 python manage.py runserver#flask启动项目,像djagno一样,通过命令启动(版本必须匹配)Flask==2.2.2Flask_Script==2.0.3#借助于:flask-script 实现 安装:pip3.8 install flask-script 修改代码: from flask_script import Manager manager=Manager(app) manager.run() 用命令启动 python manage.py runserverfrom flask import Flaskfrom flask_script import Managerapp = Flask(__name__)app.debug =Truemanager = Manager(app)@app.route("/")def index(): return "index"if __name__ == "__main__": # app.run() manager.run()# 自定制命令 #1 简单自定制命令 @manager.command def custom(arg): # 命令的代码,比如:初始化数据库, 有个excel表格,使用命令导入到mysql中 print(arg) #2 复杂一些的自定制命令 @manager.option("-n", "--name", dest="name") @manager.option("-u", "--url", dest="url") def cmd(name, url): # python run.py cmd -n lqz -u xxx # python run.py cmd --name lqz --url uuu print(name, url)# django 中如何自定制命令# flask 中没有orm框架,对象关系映射,方便我们快速操作数据库# flask,fastapi中用sqlalchemy居多# SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果# 安装pip3.8 install sqlalchemy#了解SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件 pymysql mysql+pymysql://:@/[?] cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html 先不是orm 而是原生sql第一步:导入from sqlalchemy import create_engine# 第二步:生成引擎对象engine = create_engine( "mysql+pymysql://root@127.0.0.1:3306/cnblogs", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置))# 第三步:使用引擎获取连接,操作数据库conn = engine.raw_connection()cursor=conn.cursor()cursor.execute("select * from aritcle")print(cursor.fetchall())# 第一步:导入from sqlalchemy import create_engineimport datetimefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index# 第二步:执行declarative_base,得到一个类Base = declarative_base()# 第三步:继承生成的Base类class User(Base): # 第四步:写字段 id = Column(Integer, primary_key=True) # 生成一列,类型是Integer,主键 name = Column(String(32), index=True, nullable=False) # name列varchar32,索引,不可为空 email = Column(String(32), unique=True) # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 ctime = Column(DateTime, default=datetime.datetime.now) # extra = Column(Text, nullable=True) # 第五步:写表名 如果不写以类名为表名 __tablename__ = "users" # 数据库表名称 # 第六步:建立联合索引,联合唯一 __table_args__ = ( UniqueConstraint("id", "name", name="uix_id_name"), # 联合唯一 Index("ix_id_name", "name", "email"), # 索引 )class Book(Base): __tablename__ = "books" id = Column(Integer, primary_key=True) name = Column(String(32))# 第七步:把表同步到数据库中# 不会创建库,只会创建表engine = create_engine( "mysql+pymysql://root@127.0.0.1:3306/aaa", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置))# 把表同步到数据库 (把被Base管理的所有表,都创建到数据库)Base.metadata.create_all(engine)# 把所有表删除# Base.metadata.drop_all(engine)
标签:
Copyright © 2015-2023 港澳经营网版权所有 备案号:京ICP备2023022245号-31 联系邮箱:435 226 40 @qq.com