模块四 并发编程 && sql基础


并发编程

空间上的复用:将内存分为几部分,每个部分放入一个程序,这样,同一时间内存中就有了多道程序。
分时操作系统:
多个联机终端+多道技术

第三代 多道技术  单核架构下的并发   不是并行
空间复用   读取到内存中 内存硬件实现物理隔离 内存中程序不能相互访问
时间复用 
1、长时间切  上下文管理
2、io操作切换 

多核  并行   同时运行 多进程
  • 进程的2种启动方式
from multiprocessing import Process
import time


def task(name):
    print('%s is running' % name)
    time.sleep(3)
    print('%s is done' % name)


if __name__ == '__main__':
    # 支持元祖 和 字典传参 元祖就位置化参数,字典就关键字参数,注意元祖后面要加逗号
    # Process(target=task,kwargs={'name':'子进程1'})
    p = Process(target=task, args=('子进程1',))
    p.start()
    print('主')


from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s is running' % self.name)
        time.sleep(3)
        print('%s is done' % self.name)

if __name__ == '__main__':
    p = MyProcess('alex')
    p.start()
    print('主')
  • 进程常用方法
join

import time
import random
from multiprocessing import Process


def piao(name):
    print('%s piaoing' % name)
    time.sleep(random.randrange(1, 5))
    print('%s piao end' % name)


if __name__ == '__main__':
    # 实例化得到四个对象
    p1 = Process(target=piao, args=('egon',))  # 必须加,号
    p2 = Process(target=piao, args=('alex',))
    p3 = Process(target=piao, args=('wupeqi',))
    p4 = Process(target=piao, args=('yuanhao',))

    # 调用对象下的方法,开启四个进程
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('主')

import random
from multiprocessing import Process
import time


def task(name):
    print('{} is start'.format(name))
    time.sleep(random.randint(1, 5))
    print('{name} is end'.format(name=name))


if __name__ == '__main__':
    p1 = Process(target=task, args=('lixiang',))
    p2 = Process(target=task, args=('alex',))
    p3 = Process(target=task, args=('sb',))

    p_len = [p1, p2, p3]

    for item in p_len:
        item.start()

    for item in p_len:
        item.join()

    print('主')
getpidgetppid

# 查看pid
import os
from multiprocessing import Process
import time

def task():

    print('%s is running,parent id is <%s>' %(os.getpid(),os.getppid()))
    time.sleep(3)
    print('%s is done,parent id is <%s>' %(os.getpid(),os.getppid()))

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    print(os.getpid())


import os
import random
from multiprocessing import Process
import time


def task(name):
    print('{} is start, parents is {}'.format(name,os.getppid()))
    time.sleep(random.randint(1, 5))
    print('{name} is end, parents is {pid}'.format(name=name,pid=os.getppid()))


if __name__ == '__main__':
    p1 = Process(target=task, args=('lixiang',))
    p2 = Process(target=task, args=('alex',))
    p3 = Process(target=task, args=('sb',))

    p_len = [p1, p2, p3]

    for item in p_len:
        item.start()

    for item in p_len:
        item.join()

    print('主',os.getpid(),os.getppid())


from multiprocessing import Process
import time,os

def task(name,n):
    print('%s is running' %name)
    time.sleep(n)


if __name__ == '__main__':
    start=time.time()
    p1=Process(target=task,args=('子进程1',5))
    p2=Process(target=task,args=('子进程2',3))
    p3=Process(target=task,args=('子进程3',2))

    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()

    print('主',(time.time()-start))
is_alive,terminate,name
from multiprocessing import Process,current_process
import time,os

def task():
    print('%s is running,parent id is <%s>' %(os.getpid(),os.getppid()))
    time.sleep(3)
    print('%s is done,parent id is <%s>' %(os.getpid(),os.getppid()))

if __name__ == '__main__':
    p = Process(target=task,name='sub_process')
    p.start()
    # p.terminate()
    # time.sleep(2)
    print(p.is_alive())
    p.join()
    print('主')
    print(p.name)
    print(p.is_alive())
# 进程空间隔离

from multiprocessing import Process

n = 100

def work():
    global n
    n = 0
    print('子进程内:',n)

if __name__ == '__main__':
    p = Process(target=work)
    p.start()
    p.join()
    print('主进程内',n)



# join

from multiprocessing import Process
import time
import random

def task(n):
    time.sleep(random.randint(1,3))
    print('----->%s' %n)

if __name__ == '__main__':
    p1 = Process(target=task,args=(1,))
    p2 = Process(target=task,args=(2,))
    p3 = Process(target=task,args=(3,))

    p1.start()
    # p1.join()
    p2.start()
    # p2.join()
    p3.start()
    # p3.join()

    print('-------4')
  • 子进程 并发任务 单核
创建子进程  操作系统调用创建,会copy父进程所有数据和状态(unix,window不会copy父进程的所有数据和状态),然后在运行,父进程只是发起开启子进程的信号(操作系统去创建子进程),然后继续执行,最后等待子进程退出后在结束

为什么要等待子进程退出后父进程才结束?
僵尸进程  有害 父进程一直不退出,子进程不回收,占用pid,导致后面进程无法启动,子进程变成僵尸需要父进程来进行回收
wait 调用退出

孤儿进程:
init 接管所有孤儿进程,无害,init来回收,父进程挂了,子进程还活着

子进程执行完后不会立即退出,直到主进程执行结束后才会退出,案例,在主进程执行的最后可以查看到子进程的pid

守护进程等待主进程代码执行完毕子进程也会跟着结束,用于主进程退出就失去了意义的场景

守护进程不能再开子进程,如果开子进程,要等待子进程执行完在退出,会产生一堆孤儿进程,一般是父进程回收子进程

测试可以看到主进程会优先执行完成不退出,子进程执行完成才会完全退出,父进程会回收子进程内存空间,就是上面wait等待子进程
# 守护进程

from multiprocessing import Process
import time

def foo():
    print(123)
    time.sleep(1)
    print(456)

def bar():
    print(456)
    time.sleep(2)
    print('enc456')


if __name__ == '__main__':
    p1 = Process(target=foo)
    p2 = Process(target=bar)

    p1.daemon=True
    p1.start()
    p1.join()
    p2.start()
    print('main=====>')
互斥锁  并发 选择性能还是执行顺序 

主进程、子进程不同内存空间  内存隔离

与join区别,并发变串行,互斥锁只对写入的时候比如修改 公共文件,db的时候加锁,查看的时候可以并发查询  
join只能把所有的子进程变成串行执行


互斥锁  原理把多个进程并发修改共享数据操作变成串行,效率降低,保证数据安全
文件的方式,实现了进程之间共享数据,内存隔离,共享硬盘上的空间  
1.文件读写效率低
2.共享释放锁,需要加锁处理
# 互斥锁
from  multiprocessing import Process
import time

def task(name):
    print('%s 1' %name)
    time.sleep(1)
    print('%s 2' % name)
    time.sleep(1)
    print('%s 3' % name)


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=task,args=('进程%s' %i,))
        p.start()
        p.join()


from multiprocessing import Process, Lock
import pickle
import time


def search(name):
    time.sleep(1)
    dic = pickle.load(open('db.txt', 'r', encoding='utf-8'))
    print('<%s> 查看到剩余票数【%s】' % (name, dic['count']))


def get(name):
    time.sleep(1)
    dic = pickle.load(open('db.txt', 'r', encoding='utf-8'))
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(3)
        pickle.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print('<%s> 购票成功' % name)
# 锁 多个进程并发修改共享数据操作变成串行,效率降低,保证数据安全

def task(name, mutex):
    search(name)
    mutex.acquire()
    get(name)
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(10):
        p = Process(target=task, args=('路人%s' % i, mutex))
        p.start()
# join可以解决问题,不过把所有进程都变成了串行执行,对读操作不影响,应该只在写的时候串行执行,join无法做到,这就是互斥锁存在的意义
# 队列的使用 先进先出  put get full empty
from multiprocessing import Queue

q = Queue(3)
q.put('hello')
q.put({'a':1})
q.put([3,3,3,])
print(q.full())

# q.put(4)
print(q.get())
print(q.get())
print(q.get())
print(q.empty())
print(q.get())


# 消费生产

from multiprocessing import Process,Queue
import time

def producer(q):
    for i in range(10):
        res = '包子%s' %i
        time.sleep(0.5)
        print('生产着生产了%s' %res)

        q.put(res)

def consumer(q):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(1)
        print('消费者吃了%s' %res)

if __name__ == '__main__':
    q = Queue()

    p1=Process(target=producer,args=(q,))
    p2=Process(target=producer,args=(q,))
    p3=Process(target=producer,args=(q,))

    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    print('主')
# 通过 joinablequeue 优雅的让消费者退出

from multiprocessing import Process,JoinableQueue
import time

def producer(q):
    for i in range(10):
        res = '包子%s' %i
        time.sleep(0.5)
        print('生产着生产了%s' %res)

        q.put(res)
    q.join()

def consumer(q):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(1)
        print('消费者吃了%s' %res)
        q.task_done()

if __name__ == '__main__':

    q = JoinableQueue()

    # 生产者们 生产的块,多一个进程  消费不完
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消费者们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    c1.daemon = True
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    print('主')

线程

# 2种启动线程方式
import time
import random
from threading import Thread

def piao(name):
    print('%s piaoing' %name)
    time.sleep(random.randint(1,3))
    print('%s piao end' %name)

if __name__ == '__main__':
    t1 = Thread(target=piao,args=('egon',))
    t1.start()
    print('主线程')


class Mythread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s piaoing' %self.name)
        time.sleep(random.randint(1, 3))
        print('%s piao end' % self.name)

if __name__ == '__main__':
    t1 = Mythread('egon')
    t1.start()
    print('主')


# 进程与线程区别
# 进程开销大于线程 看启动顺序
import time
from threading import Thread
from multiprocessing import Process

def piao(name):
    print('%s piaoing' %name)
    time.sleep(2)
    print('%s piao end' %name)

if __name__ == '__main__':
    p1 = Process(target=piao,args=('egon',))
    p1.start()

    t1 = Thread(target=piao,args=('egon子线程1',))
    t1.start()
    print('主线程')
# 同一进程内多个线程共享该进程的地址空间 == 内存
from threading import Thread
from multiprocessing import Process

n = 100
def task():
    global n
    n = 0

if __name__ == '__main__':
    # p1 = Process(target=task,)
    # p1.start()
    # p1.join()

    t1 = Thread(target=task,)
    t1.start()
    t1.join()

    print('主线程',n)
# pid 子线程的id跟主线程id一样,内存共享,父进程就是运行的pycharm
from threading import Thread
from multiprocessing import Process, current_process
import os


def task():
    print(current_process().pid)
    print('子进程ID:%s 父进程的PID:%s' % (os.getpid(), os.getppid()))


if __name__ == '__main__':
    # p1 = Process(target=task,)
    # p1.start()

    t1 = Thread(target=task,)
    t1.start()

    print('主线程',current_process().pid)
    print('主线程',os.getpid())
# Thread 对象的属性或方法
import time
from threading import Thread,active_count,enumerate,currentThread,current_thread

def task():
    print('%s is ruuning' %current_thread().getName())
    time.sleep(2)
    print('%s is done' %current_thread().getName())

if __name__ == '__main__':
    t = Thread(target=task,name='子线程1')
    t.start()

    t.setName('儿子线程1')
    t.join()
    print(t.getName())

    # currentThread().setName('主线程')
    # print(t.isAlive())

    print('主线程',currentThread().getName())

    t.join()
    print(active_count())
    print(enumerate())
# 守护线程

from threading import Thread
import time

def sayhi(name):
    time.sleep(2)
    print('%s say hello' % name)

if __name__ == '__main__':
    t = Thread(target=sayhi,args=('egon',))
    t.setDaemon(True)
    # t.daemon = True
    t.start()

    print('主线程')
    print(t.is_alive())


# 注意 跟守护进程还是有区别的,线程是在一个PID中,也就是子主线程都在一起,一个地址空间,主的完了,非守护线程还会执行,保证线程运行直接子线程结束
from threading import Thread
import time

def foo():
    print(123)
    time.sleep(1)
    print('end123')

def bar():
    print(456)
    time.sleep(3)
    print('end456')

if __name__ == '__main__':
    t1 = Thread(target=foo)
    t2 = Thread(target=bar)
    t2.daemon = True
    # t1.daemon = True
    t1.start()
    t2.start()

    print('main====>')

# 123
# 456
# main====>
# end123
# 互斥锁

from threading import Thread, Lock
import time

n = 100

def task():
    global n
    mutex.acquire()
    temp = n
    time.sleep(0.1)
    n = temp - 1
    print(n)
    mutex.release()

if __name__ == '__main__':
    mutex = Lock()
    t_l = []
    for i in range(100):
        t=Thread(target=task)
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()

    print('主',n)

# GIL解释器锁

# 计算密集型 多进程

from multiprocessing import Process
from threading import Thread
import os, time


def work():
    res = 0
    for i in range(100000000):
        res *= 1


if __name__ == '__main__':
    l = []
    print(os.cpu_count())
    start = time.time()
    for i in range(8):
        # p = Process(target=work)  # 耗时6s多
        p = Thread(target=work)  # 耗时29.49s  相差4倍 
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop = time.time()
    print('run time os %s' %(stop - start))
# IO 密集型:用多线程

from multiprocessing import Process
from threading import Thread
import threading
import os,time

def work():
    time.sleep(2)

if __name__ == '__main__':
    l = []
    start = time.time()
    for i in range(400):
        # p = Process(target=work)  # 5.63  开进程 消耗资源和时间
        p = Thread(target=work)  # 2.03  视频io 密集型,
        l.append(p)
        p.start()
    for p in l:
        p.join()

    stop = time.time()
    print('run time is %s' % (stop - start))
# 死锁  递归锁

from threading import Thread, Lock
import time

metexA = Lock()
metaxB = Lock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        metexA.acquire()
        print('%s 拿到了A锁' %self.name)

        metaxB.acquire()
        print('%s 拿到了B锁' %self.name)
        metaxB.release()

        metexA.release()

    def f2(self):
        metaxB.acquire()
        print('%s 拿到了B锁' % self.name)
        time.sleep(0.1)

        metexA.acquire()
        print('%s 拿到了A锁' % self.name)

        metexA.release()
        metaxB.release()

if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()


# 互斥锁 只能acquire一次

from threading import Thread,Lock
metaxA = Lock()
metaxA.acquire()
metaxA.release()


# 解决 递归锁 可以连续acquire多次,每次acquire一次计数器加1 ,只有计数器为0时,才能被抢到acquire  递归锁不会死锁 不过变向的变成了串行执行,要释放完锁才能被其他线程拿到锁


from threading import Thread,RLock
import time

metaxB = metexA = RLock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        metexA.acquire()
        print('%s 拿到了A锁' %self.name)

        metaxB.acquire()
        print('%s 拿到了B锁' %self.name)
        metaxB.release()

        metexA.release()

    def f2(self):
        metaxB.acquire()
        print('%s 拿到了B锁' % self.name)
        time.sleep(0.1)

        metexA.acquire()
        print('%s 拿到了A锁' % self.name)

        metexA.release()
        metaxB.release()

if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()
# 信号量
from threading import Thread,Semaphore,currentThread
import time,random

sm = Semaphore(2)

def task():
    # sm.acquire()
    # print('%s in ' % currentThread().getName())
    # sm.release()
    with sm:
        print('%s in' % currentThread().getName())
        time.sleep(random.randint(1,3))

if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=task)
        t.start()

# event事件

from threading import  Thread,Event
import time

event = Event()
# event.set()
# event.wait()

def student(name):
    print('学生%s 正在听课' %name)
    event.wait(10)
    print('学生%s 课件活动' %name)

def tearcher(name):
    print('老师%s 正在授课' %name)
    time.sleep(7)
    event.set()

if __name__ == '__main__':
    stu1 = Thread(target=student,args=('alex',))
    stu2 = Thread(target=student, args=('exx',))
    stu3 = Thread(target=student, args=('yxx',))
    t1 = Thread(target=tearcher,args=('egon',))

    stu1.start()
    stu2.start()
    stu3.start()
    t1.start()



from threading import Thread,Event,currentThread
import time

event = Event()

def conn():
    n = 0
    while not event.is_set():
        if n == 3:
            print('%s try too many times' % currentThread().getName())
            return
        print('%s try %s' %(currentThread().getName(),n))
        event.wait(0.5)
        n += 1
    print('%s is connected' %currentThread().getName())


def check():
    print('%s is checking' % currentThread().getName())
    time.sleep(1)
    event.set()

if __name__ == '__main__':
    for i in range(3):
        t = Thread(target=conn)
        t.start()
    t = Thread(target=check)
    t.start()

# Thread-3 is connected
# Thread-1 is connected
# Thread-2 is connected
# 定时器
from threading import Timer

def task(name):
    print('hello %s' %name)

t = Timer(5,task,args=('egon',))
t.start()

# 验证码5s过期 实现
from threading import Timer,Thread
import random

class Code(object):
    def __init__(self):
        self.make_cache()

    def make_cache(self,interval=5):
        self.cache = self.make_code()
        print(self.cache)
        self.t = Timer(interval,self.make_cache)
        self.t.start()

    def make_code(self,n=4):
        res = ''
        for i in range(n):
            s1 = str(random.randint(0,9))
            s2 = chr(random.randint(65,90))
            res += random.choice([s1,s2])
        return res

    def check(self):
        while True:
            code = input('请输入你的验证码:').strip()
            if code.upper() == self.cache:
                print('验证码输入正确')
                self.t.cancel()
                break

obj = Code()
obj.check()
# 线程queue  类似进程queue == 生成消费模型
import queue

q = queue.Queue(3)  # 先进先出

q.put('first')
q.put(2)
q.put('third')

# q.put(4)
# q.put(4,block=False) # q.put_nowait(4)
# q.put(4,block=True,timeout=3)  # queue.Full

print(q.get())
print(q.get())
print(q.get())

# print(q.get(block=False))  # q.get_nowait()  _queue.Empty
# print(q.get(block=True,timeout=3))

import queue
q = queue.LifoQueue(3) # 栈 后进先出

q.put('first')
q.put(2)
q.put('third')

print(q.get())
print(q.get())
print(q.get())


q = queue.PriorityQueue(3) # 优先级队列 越小越优先

q.put((10,'one'))
q.put((40,'two'))
q.put((3,'three'))

print(q.get())
print(q.get())
print(q.get())
# 进程池 线程池

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os, time, random


def task(name):
    print('name: %s pid:%s run ' % (name, os.getpid()))
    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)  # 计算密集型 单线程可以避免GIL,提高计算效率
    # pool = ThreadPoolExecutor(5) # io密集型 

    for i in range(10):  # 并发10个,一次只能运行5个 线程池限制了线程数
        pool.submit(task, 'egon%s' % i)
    pool.shutdown(wait=True)  # 类似join效果

    print('主')

# 线程跟主线程公用一个pid,相同内存地址,因此主线程执行完成,子线程如果还在执行线程就没有结束,守护线程与进程的区别,要注意
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
import os,time,random

def task():
    print('name:%s pid:%s run' %(currentThread().getName(),os.getpid()))
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        pool.submit(task,)
    pool.shutdown(wait=True)

    print('主')

# name:ThreadPoolExecutor-0_1 pid:29985 run
# name:ThreadPoolExecutor-0_4 pid:29985 run
# 异步调用与回调机制
# 提交任务2中方式
# 1、同步调用:提交任务完后,就在原地等待任务执行完毕,拿到结果,在执行下一行代码,导致程序是串行执行
# 顺序执行需要上次执行的结果,要等待result()

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print('%s is laing' %name)
    time.sleep(random.randint(1,3))
    res = random.randint(7,13)*'#'
    return {'name':name, 'res':res}

def weigh(shit):
    name = shit['name']
    size = len(shit['res'])
    print('%s 拉了 <%s>kg' %(name,size))

if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)

    start = time.time()

    shit1 = pool.submit(la,'alex').result()
    weigh(shit1)

    shit2 = pool.submit(la,'wupeiqi').result()
    weigh(shit2)

    shit3 = pool.submit(la,'agon').result()
    weigh(shit3)

    end = time.time()

    print('耗时%s' % (end - start))

# alex is laing
# alex 拉了 <13>kg
# wupeiqi is laing
# wupeiqi 拉了 <9>kg
# agon is laing
# agon 拉了 <8>kg
# 耗时8.00


# 2.异步调用:提交完任务后,不能等待任务执行完毕,并发执行  

from concurrent.futures import ThreadPoolExecutor
import time,random

def la(name):
    print('%s is laing' % name)
    time.sleep(random.randint(1, 3))
    res = random.randint(7, 13) * '#'
    return {'name': name, 'res': res}

def weigh(shit):
    shit = shit.result()
    name = shit['name']
    size = len(shit['res'])
    print('%s 拉了 《%s》kg' % (name, size))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)

    start = time.time()

    pool.submit(la,'alex').add_done_callback(weigh)

    pool.submit(la, 'wupeiqi').add_done_callback(weigh)

    pool.submit(la, 'egon').add_done_callback(weigh)

    end = time.time()

    print('耗时%s' %(end - start))

# alex is laing
# wupeiqi is laing
# egon is laing
# 耗时0.0005919933319091797
# wupeiqi 拉了 《7》kg
# egon 拉了 《13》kg
# alex 拉了 《7》kg
# 回调函数练习 注意result(),add_done_callback()

from concurrent.futures import ThreadPoolExecutor
import requests
import time

def get(url):
    print('GET %s' % url)
    response = requests.get(url)
    time.sleep(3)
    return {'url':url,'content':response.text}


def parse(res):
    # res = res.result()
    res = res.result()
    print('%s parse res is %s' %(res['url'],len(res['content'])))


if __name__ == '__main__':
    urls = [

        'http://www.cnblogs.com/linhaifeng',
        'https://www.python.org',
        'https://www.openstack.org',
    ]

    pool = ThreadPoolExecutor(2)

    for url in urls:
        pool.submit(get,url).add_done_callback(parse)
        # ret = pool.submit(get, url)   # 只会执行get,不会传递到parse执行
        # parse(ret)  # TypeError: 'Future' object is not subscriptable
# 协程
# 并发执行
import time

def producer():
    g = consumer()
    next(g)
    for i in range(10000000):
        g.send(i)

def consumer():
    while True:
        res = yield

start_time = time.time()
producer()
end_time = time.time()
print(end_time - start_time)
# 0.778

# 串行
import time

def producer():
    res = []
    for i in range(10000000):
        res.append(i)
    return res

def consumer(res):
    pass

start_time=time.time()
res=producer()
consumer(res)
stop_time=time.time()
print(stop_time-start_time)
# 0.686
# 实现切换 + 保存状态,但没解决io堵塞的自动发现的问题

from greenlet import greenlet
import time

def eat(name):
    print('%s eat 1' %name)
    time.sleep(10)
    g2.switch('egon')
    print('%s eat 2' %name)
    g2.switch()

def play(name):
    print('%s play 1' %name)
    g1.switch()
    print('%s play 2'% name)

g1 = greenlet(eat)
g2 = greenlet(play)

g1.switch('egon')

# egon eat 1
# egon play 1
# egon eat 2
# egon play 2


# 解决发现io 自动进行切换 通过monkey实现
from gevent import monkey; monkey.patch_all()
import gevent
import time

def eat(name):
    print('%s eat 1' % name)
    time.sleep(3)
    print('%s eat 2' % name)


def play(name):
    print('%s play 1' % name)
    time.sleep(4)
    print('%s play 2' % name)

start_time = time.time()
g1 = gevent.spawn(eat,'egon')
g2 = gevent.spawn(play,'alex')

g1.join()
g2.join()

end_time = time.time()

print(end_time - start_time)
# 4.00
from gevent import monkey;monkey.patch_all()
import gevent
import time


def eat(name):
    print('%s eat 1' % name)
    time.sleep(3)
    print('%s eat 2' % name)


def play(name):
    print('%s play 1' % name)
    time.sleep(4)
    print('%s play 2' % name)


g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,'alex')

# time.sleep(5)
#
# g1.join()
# g2.join()

gevent.joinall([g1,g2])

queue实现线程池

import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor

queue = queue.Queue()



class ThreadNum(threading.Thread):
    """没打印一个数字等待1秒,并发打印10个数字需要多少秒?"""

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        done = False
        while not done:
            # 消费者端,从队列中获取num
            num = self.queue.get()
            if num is None:
                done = True
            else:
                print("Retrieved", num)
            time.sleep(1)
            # 在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号
            self.queue.task_done()

        print("Consumer Finished")


def main():
    # 产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启5个并发
    for i in range(5):
        t = ThreadNum(queue)
        t.setDaemon(True)
        t.start()

    # 往队列中填错数据
    for num in range(10):
        queue.put(num)

    queue.join()
    # time.sleep(10)
    for i in range(5):
        queue.put(None)
        print('None')
    time.sleep(2)


if __name__ == '__main__':
    start = time.time()
    queue.maxsize = 2
    main()
    print("Elapsed Time: %s" % (time.time() - start))

import threading, queue, time


class Threadpool:
    '''基于队列queue实现的线程池'''

    def __init__(self, max_thread=1):
        '''创建进程队列'''
        self.queue = queue.Queue(maxsize=max_thread)

    def apply(self, target=None, args=(), callback=None, calljoin=True, **kwargs):
        ''':param callback 回调函数 当子线程函数运行结束后将返回值传入回调函数
            :param calljoin 布尔值  回调函数是否阻塞进程池 默认True 只有当目标函数和回调函数都执行结束后才视为该线程结束
            其他参数同threading.Thread类
            注意:只有当目标函数和回调函数都执行结束后,消息队列才会取回值(即回调函数会阻塞线程池)
        '''
        if not callback:
            callback = self._callback
        t = threading.Thread(target=self._decorate(target, callback, calljoin), args=args, **kwargs)
        self.queue.put(t)
        t.start()

    def join(self):
        '''
            当线程池中还有未执行结束的子线程时 阻塞主线程
            注意:当calljoin=False时 因回调函数在消息队列取回后才执行 故join不会等待回调函数
        '''
        while self.queue.qsize():
            time.sleep(0.05)

    def _decorate(self, target, callback, calljoin):
        ''':param target 接收一个目标函数
            :param  接受一个回调函数callback
            :param backjoin 布尔值 若为真 则当回调函数执行结束后才释放队列 否则 当目标函数执行结束后就会释放队列
            本函数本质上是一个装饰器,即运行目标函数后,执行队列取回(self.queque.get()),并将返回值作为参数执行回调函数。
        '''

        def wrapper(*args, **kwargs):
            res = target(*args, **kwargs)
            if calljoin:
                callback(res)
                self.queue.get()
            else:
                self.queue.get()
                callback(res)
            return res

        return wrapper

    def _callback(self, *args, **kwargs):
        '''没有传入回调函数时 什么也不干'''
        pass


# 调用示例:
result_list = []


def func(arg):
    print('正在等待执行%s' % arg)
    time.sleep(10)
    return arg


def back(res):
    print('我已经取回了数据:%s' % res)
    result_list.append(res)


pool = Threadpool(max_thread=2)
for i in range(40):
    pool.apply(target=func, args=(i,), callback=back)
pool.join()
print(result_list)
class Threadpool:
    def __init__(self, maxthread):
        self.maxthread = maxthread
        self.queue = queue.Queue(maxsize=maxthread)
        for i in range(maxthread):
            self.queue.put(Thread)

    def put_thread(self):
        '''put'''
        self.queue.put(Thread)

    def get_thread(self):
        '''get'''
        return self.queue.get()

    def get_queue_size(self):
        '''size'''
        return self.queue.qsize()

pool = Threadpool(config_info['set_thread'])
t = pool.get_thread()
obj = t(target=communicate, args=(conn,))
obj.start()

线程池实现并发socket

rom socket import *
from threading import Thread

def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()

def server(ip,port):
    server = socket(AF_INET,SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)

    while True:
        conn,addr = server.accept()
        t = Thread(target=communicate,args=(conn,))
        t.start()
    server.close()


if __name__ == '__main__':
    server('127.0.0.1',8081)

# 基于线程池实现

from socket import *
from concurrent.futures import ThreadPoolExecutor


def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()

def server(ip,port):
    server = socket(AF_INET,SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)

    while True:
        conn,addr = server.accept()
        pool.submit(communicate,conn)

    server.close()


if __name__ == '__main__':
    pool = ThreadPoolExecutor(2)
    server('127.0.0.1',8081)


from socket import *

client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8081))

while True:
    data = input('>>>(q/Q exit)').strip()
    if data.upper() == 'Q':break
    if not data:continue
    client.send(data.encode('utf-8'))
    res_data = client.recv(1024)
    print(res_data.decode('utf-8'))

client.close()

非阻塞IO

from socket import *

server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8082))
server.listen(5)
server.setblocking(False)
print('starting...')


rlist = []
wlist = []

while True:
    try:
        conn, addr = server.accept()
        rlist.append(conn)
        print(rlist)
    except BlockingIOError:
        # print('干其他的活')

        del_rlist = []
        for conn in rlist:
            try:
                data = conn.recv(1024)
                if not data:
                    del_rlist.append(conn)
                    continue
                wlist.append((conn,data.upper()))
            except BlockingIOError:
                continue
            except Exception:
                conn.close()
                del_rlist.append(conn)


        del_wlist = []  # remove不能修改源列表,so 新创建一个删除列表
        for item in wlist:
            try:
                conn = item[0]
                data = item[1]
                conn.send(data)
                del_wlist.append(item)
            except BlockingIOError:
                pass

        for item in del_wlist:
            wlist.remove(item)

        for conn in del_rlist:
            rlist.remove(conn)

server.close()


from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8082))


while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()

io多路复用

from socket import *
import select

server = socket(AF_INET, SOCK_STREAM)
server.bind('127.0.0.1', 8003)
server.listen(5)
server.setblocking(False)
print('starting...')

rlist = [server, ]
wlist = []
wdata = {}

while True:
    rl, wl, xl = select.select(rlist, wlist, [], 0.5)
    print('rl', rl)
    print('wl', wl)

    for sock in rl:
        if sock == server:
            conn, addr = sock.accept()
            rlist.append(conn)
        else:
            try:
                data = sock.recv(1024)
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                wlist.append(sock)
                wdata[sock] = data.upper()
            except Exception:
                sock.close()
                rlist.remove(sock)

    for sock in wl:
        data = wdata[sock]
        sock.send(data)
        wlist.remove(sock)
        wdata.pop(sock)

server.close()


from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8083))


while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()

协程解决io阻塞

from gevent import monkey,spawn;monkey.patch_all()

from socket import *


def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:break
            res_data = data.decode('utf-8').upper()
            conn.send(res_data.encode('utf-8'))
        except ConnectionResetError:
            break
    conn.close()

def server(ip,port):


    server = socket(AF_INET,SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)

    while True:
        conn,addr = server.accept()
        spawn(communicate,conn)

    server.close()


if __name__ == '__main__':
    g = spawn(server,'127.0.0.1',8081)
    g.join()


from socket import *
from threading import Thread,currentThread

def client():

    client = socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8081))

    while True:
        client.send(('%s hello' % currentThread().getName()).encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))

    client.close()

if __name__ == '__main__':
    for i in range(100):
        t = Thread(target=client)
        t.start()
队列   解决进程之间通讯的问题   进程在内存中是相互隔离的
改成共享内存   解决上面2个问题
IPC通信  机制  进程之间通信  队列 & 管道  -- 共享内存空间    队列 = 管道 + 锁   先进先出   

队列用于进程之间通讯使用  生产者消费者模型

q.task_done()
消费者使用此方法发出信号,表示q.get()返回的项目已经被处理完成。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。

q.join()
生产者使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。


2021.03.23  并发编程2

主进程执行结束 依旧会等待子进程执行完毕回收内存空间,原因就是自己的父进程回收最快,如果父进程挂了就是孤儿进程,init统一回收

子进程变成僵尸进程,父进程回收, 如果父进程一直不退出,子进程不回收,占用pid,有害的

1、开进程的开销远大于开线程
2、同一进程内的多个线程共享该进程的地址空间
3、pid

主线程(主进程)   子线程

多线程与多进程区别:一个进程下多个线程,共享内存空间    

多线程  互斥锁
1、局部串行  只涉及共享数据修改的部分进行加锁
2、牺牲性能,提高数据安全,把并行变成串行   

GIL 全球解释器锁  Cpython解释器 
效率降低,保证数据安全

多线程无法用上多核优势,1个进程10个线程 == 单核  

多进程 单线程 (GIL就不会出现) 解决 多线程无法利用多核的情况   

垃圾回收机制变得安全

01

垃圾回收机制不是一直存在,定期执行

保证不同的数据加不同的锁,GIL 保证Python解释器垃圾回收安全   Cpython解释器

io 密集型  多线程  并发执行,同时只能运行一个程序 == 只能用于单核    共享内存空间,并发执行,节省CPU等待时间
计算密集型 多进程  并发执行,一个进程跑一个线程,串行执行,不存在GIL,可以跑在多核    牺牲内存空间,提高效率利用多核 

多线程不断切换并行执行 等待GIL 

知乎补充:
Python中的多线程是假的多线程! 为什么这么说,我们先明确一个概念,全局解释器锁(GIL)

I/O密集型的Python程序比计算密集型的Python程序更能充分利用多线程的好处。

多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,常见的大部分任务都是IO密集型任务,比如Web应用。
综上:
Python多线程相当于单核多线程,多线程有两个好处:CPU并行,IO并行,单核多线程相当于自断一臂,只使用了IO并行。所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。
互斥锁  只能acquire一次  第二次卡主
递归锁解决死锁问题计数器+1当计数器==0  其他线程才能拿到锁
全球解释器锁 GIL  

event 线程同步
线程 queue 

线程池 进程池

线程池进程池
用户侧无法控制用于控制服务端开线程和进程的个数防止服务器被无限多开内存OOM
线程池 用于io密集型   共享内存  只能用单核  可以实现并发效果  
进程池 用于CPU密集型   牺牲内存 提升计算效率  也可以实现并发效果    
多进程单线程  避免GIL的问题

提交任务的2种方式
异步调用 回调机制  实现并发执行
同步调用 等待结果  串行执行

同步任务  非阻塞 没有关系阻塞是遇到io调用停止同步任务只是单纯的等待任务执行不管是否阻塞还是非阻塞 

协程  单线程实现并发  实现切换和状态保留  本质还是一个线程执行
并发 看起来同时执行  实际还是一个一个的执行同时执行的只有一个只会快速切换看起来像同时执行  不存在锁的问题
并行  无法实现并行

单线程执行5个任务  切换保存状态  

遇到io才切换yield不能实现

02

io 复用 阻塞 === 非阻塞  

io模型
同步 不是阻塞  只是一种任务执行方式
异步  回调机制

阻塞 只是遇到io 操作系统CPU会被调走  
非堵塞  

socket  阻塞IO 模型
copy data
wait data

不能在循环列表中改变结构死循环
改变后会有一个bug之前面试题有个是删除列表中内容导致后面内容跳过索引变化了要倒序删除可以解决这个问题

socketserver 模块 
socketTCPThread 多线程 UDP   FORK 多进程

资源单位  进程 
运行单位  线程
协程、并发、并行
并发不是并行 单核多线程也可以实现并发

信号量 控制并发线程的数量

Semaphore时需要为这个许可集传入一个数量值该数量值代表同一时间能访问共享资源的线程数量
from threading import Thread,Semaphore,currentThread
import time,random

sm = Semaphore(2)

def task():
    # sm.acquire()
    # print('%s in ' % currentThread().getName())
    # sm.release()
    with sm:
        print('%s in' % currentThread().getName())
        time.sleep(random.randint(1,3))

if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=task)
        t.start()



event 事件
event.set()
event.wait()

可以使一个线程等待其他线程的通知其内置了一个标志初始值为False线程通过wait()方法进入等待状态直到另一个线程调用set()方法将内置标志设置为True时Event通知所有等待状态的线程恢复运行调用clear()时重置为 False还可以通过isSet()方法查询Envent对象内置状态的当前值

isSet(): 当内置标志为True时返回True
set(): 将标志设为True并通知所有处于等待阻塞状态的线程恢复运行状态
clear(): 将标志设为False
wait([timeout]): 如果标志为True将立即返回否则阻塞线程至等待阻塞状态等待其他线程调用set()


定时器 timer  验证码5s过期

from threading import Timer,Thread
import random

class Code(object):
    def __init__(self):
        self.make_cache()

    def make_cache(self,interval=5):
        self.cache = self.make_code()
        print(self.cache)
        self.t = Timer(interval,self.make_cache)
        self.t.start()

    def make_code(self,n=4):
        res = ''
        for i in range(n):
            s1 = str(random.randint(0,9))
            s2 = chr(random.randint(65,90))
            res += random.choice([s1,s2])
        return res

    def check(self):
        while True:
            code = input('请输入你的验证码:').strip()
            if code.upper() == self.cache:
                print('验证码输入正确')
                self.t.cancel()
                break

obj = Code()
obj.check()
queue 三种  
队列
栈
优先级队列

进程池 线程池  用线程池实现socket链接,控制client连接数量,之前是每来一个client就启动一个线程,
资源不可能无限多,控制线程数量保证服务稳定

多线程 有并发的效果,但不是并行,看起来像同时执行,实际同时只有一个线程在执行,快速切换实现并发效果
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os, time, random


def task(name):
    print('name: %s pid:%s run ' % (name, os.getpid()))
    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)  # 计算密集型 单线程可以避免GIL,提高计算效率
    # pool = ThreadPoolExecutor(5) # io密集型 单核 并发

    for i in range(10):  # 并发10个,一次只能运行5个 线程池限制了线程数
        pool.submit(task, 'egon%s' % i)
    pool.shutdown(wait=True)  # 类似join效果

    print('主')

# 线程跟主线程公用一个pid,相同内存地址,因此主线程执行完成,子线程如果还在执行线程就没有结束,守护线程与进程的区别,要注意
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
import os,time,random

def task():
    print('name:%s pid:%s run' %(currentThread().getName(),os.getpid()))
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        pool.submit(task,)
    pool.shutdown(wait=True)

    print('主')

# name:ThreadPoolExecutor-0_1 pid:29985 run
# name:ThreadPoolExecutor-0_4 pid:29985 run

异步调用与回调机制

协程 简单回顾协程原理

协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的

对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程

协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:

1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。
2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换

1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

优点:
1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu

缺点:
1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

总结协程特点:
必须在只有一个单线程里实现并发
修改共享数据不需加锁
用户程序里自己保存多个控制流的上下文栈
附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

多线程  并发执行 比 串行执行效率要高很多,特别在高IO模型中,串行就只能等待,并发多个线程能充分利用单核CPU,虽然同时只有一个线程在执行,但是并发可以保证尽最大可能利用CPU资源,串行就只能等待,这就是多线程存在的意义

模拟面试:

315 面试题  https://www.cnblogs.com/wupeiqi/p/9078770.html
算法、性能优化

sql

  • 数据类型
整型  tinyint   1字节  signed  带符号  
   int    4字节  unsigned 不带符号  int(10)  10这里是显示宽度  不是代表用多少字节,存储宽度,除了整型其他括号里面都是多少字节,整型不同类型已经决定了存放字节大小。 
  SMALLINT  2字节
  BIGINT  8字节

浮点型  float   4字节  单精度   
             double 8字节   双精度
     decimal   8字节 无限小数

日期类型  
DATETIME
\c 结束sql执行  或者 ';'  

字符串  
去除末尾空格,name = 'test';   前面空格和中间空格不会去除
like,不会去除末尾空格,要匹配空格
char     定长,存储快,浪费空间  没有头
varchar  存储慢,节省空间,有头
不混用
char > varchar > text

枚举  
enum  单选   不在范围内的为空
set  多选  (read,eat,sports,)
  • 约束条件
id int(11) unsigned  zerofill  
Null /  not null default 'male'   不能为空
unique key 单列唯一 unique(id),
联合唯一   unique(ip,port)
primary key   主键
约束:  not null unique   不为空且唯一 
存储引擎(innodb): 对于innodb存储引擎来说,一张表内必须有一个主键
没有建会自动找到不为空且唯一字段设置为主键,没有,就会建一个隐藏字段作为主键
单列主键

复合主键
primary key(ip,port)
auto_increment  
id int primary key auto_increment,
会从上一次id继续增长  默认 步长1 
show variables like 'auto_inc%'
步长 auto_increment_increment默认为1
启始偏移量  auto_increment_offset 默认 1

设置步长
set session auto_increment_increment=5;
set global auto_increment_increment=5;


# 设置启始偏移量
set global   auto_increment_offset = 3;
强调: 起始偏移量 <= 步长

清空表
delete from t20; where id = 3
采用 清空表首选
truncate t20;  # 自增长置空

外键 建立表之间关系  foreign key
 先建被关联的表 dep,并且保证被关联的字段唯一   在建关联的表emp  (多)  1对多  在多的里面建立foreign key字段
 先往被关联表dep插入记录  在往关联表emp插入记录(多)
 先删关联表emp 数据,在删被关联表dep数据  or on delete cascade on update cascade  可以直接删除或更新被关联表dep,关联表emp数据会自动删除或更新
 正常情况是删除关联表emp数据,在删被关联表dep数据没问题,如果要实现删除被关联表dep数据自动删除关联表emp数据需要在关联表emp中加入on delete cascade on update cascade
 扩展性 逻辑去建立外键,而不是在库中建立硬性关系,强制耦合在一起,不方便扩展,解耦
  • 两张表之间的关系:

    多对一
    出版社   书(foreign key(press_id) reference press(id) on delete cascade on update cascade
    多对多
    作者     建立第三张表,双向foreign key   没有第三张表无法建立(相互依赖,都是被关联表,无法创建)
    一对一
    客户  学生   注意先建立那张表,先有客户再有学生,先创建客户表,在学生表上customer_id加上约束
    foreign key unique 
         customer_id int unique, #该字段一定要是唯一的
         foreign key(customer_id) references customer(id) #外键的字段一定要保证unique
         on delete cascade
         on update cascade
    
  • 增删改查

insert into 表名 select 
update 表名 set  xx=xx where 条件
delete from 表名 where 条件
truncate 表名
select 
单表  distinct 去重、wheregroup byhaving 过滤、order bylimit n
简单查询  
支持四则运算 字段 salary*12  
定义显示格式 CONCAT()   concat('姓名:', name,'性别:',sex)
select concat_ws(':',name,sex,age) from employee

where  between and >=<inlike% 任意字符 / _ 任意一个字符)、not between andis nullis not null

分组 == 分类  
set global sql_mode = 'ONLY_FULL_GROUP_BY'  只能取分组的字段,以及每个组聚合的结果
聚合函数 max minavgsumcount
postcount(id) as emp_count 

不要用unique 字段作为分组的依据,没有意义

没有group by 则默认整体算作一组
select max(salary) from employee

# group_concat
select post,group_concat(name) from employee group by post;

# having 分组之后过滤  where 分组之前过滤

# order by 排序 order by id ascage desc;
where avg(salary) > 100 报错  select avg(salary) from dep;  可以执行 有默认组可以用聚合函数
\c 退出
注意执行顺序,首先执行where,在执行group by  having  select  最后执行order by  
因此select执行的时候其实有默认组,别名可以在order by之后加入,因为最后运行order by

# limit 
分页  0,5 5,10 10,15

总结:

语法顺序:
select  distinct xx,xx,xx from .
where 条件
group by  分组条件
having 过滤
order by 排序字段
limit n 
执行顺序
from - where = group = having = distinct = select = order = limit

正则表达式 
where name regexp '^jin'
where name regexp '^jin.*(g|n|l)$'

多表查询:
笛卡尔积 select * from employee,department;   
select * from employee,department where employee.id = department.id
表之间连接
内连接 只取两张表的共同部分
select * employee inner join department  on employee.dep_id = department.id;

左连接 在内连接的基础上保留左表的记录
select * employee left join department  on employee.dep_id = department.id;

右连接在内连接的基础上保留右表的记录
select * employee right join department  on employee.dep_id = department.id;

全外连接 在内连接的基础上保留左右两表没有对应关系的记录
没有full join 
union实现
select * employee left join department  on employee.dep_id = department.id 
union
select * employee right join department  on employee.dep_id = department.id;

关联查询
select department.name,avg(age) from employee inner join department on employee group by department.name having avg(age) > 30;
执行顺序:
select distinct -> from -> join ->on -> where = group = having => distinct = select = order by = limit 

子查询
IN NOT IN exists

多表查询

练习: https://www.cnblogs.com/linhaifeng/articles/7267596.html#_label3

多表查询练习  查看以前笔记
sql 练习这块讲的不行,子查询过多,性能较低,应该用连表left join操作,具体如下:
SELECT student_id,sname FROM score LEFT JOIN student ON score.student_id = student.sid WHERE course_id = 3 AND num > 80 

权限管理,库、表权限
show grants for '用户'@'IP地址'                  -- 查看权限
grant  权限 on 数据库. to   '用户'@'IP地址'      -- 授权
revoke 权限 on 数据库. from '用户'@'IP地址'      -- 取消权限


表之间关系都是通过外键来实现,
外键唯一  一对一  
外键关联 一对多
相互外键  多对多


Navicat pymysql

sql注入  拼接
egon -- xxxx

03

mysql内置功能 程序解耦 专用mysql开发

视图  不推荐使用  扩展的时候找到所有视图进行修改    多表、查     一般不用修改,只是查询使用
create view xxx(表名) as select * from couerse inner join tearcher on xx.id = xx.tid
alter view   
drop view

触发器 增删改  
插入触发执行其他sql
代码中可以实现 
扩展行不好,在db中增加触发器迁移也不方便,比较坑

存储过程
内置功能封装 
cusor.callproc('p2',(2,4,0)) @_p2_0,@_p2_1,@_p2_2
cursor.excute('select @_p2_2')

应用程序与数据库结合使用的三种方式
方式1  运行效率高   扩展性低   
python:调用存储过程
mysql:编写存储过程

方式2:运行效率低   扩展性高
pyhon:编写纯生sql
mysql

方式3:开发效率高
PythonORM  - 纯生SQL
MYsql

事务  原子性操作  转账   同时成功同时失败
存储过程特殊的一种定义方式

上面都用于存储过程

函数和流程控制 
DATE_FORMAT
自定义函数,流程控制,存储过程实现

索引原理

索引数据结构 b+
索引加多了会导致写的慢,建立目录,改变表结构就必须重建索引
innodb 聚集索引 主键自动创建索引  
怎么用上索引?具体条件 非全文扫描   模糊也会影响索引效果
书的目录,找所有没有优化空间,如果只是找具体值,可以加速查找

有数据重建索引比较慢,需要对数据结构改变
联合索引
覆盖索引

socketserver源码解析