博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python_day09 多进程 多线程 协程 paramiko模块
阅读量:4843 次
发布时间:2019-06-11

本文共 13767 字,大约阅读时间需要 45 分钟。

多进程

多线程
协程
paramiko模块

1、基于UDP的套接字

UDP是面向数据报的,不是面向连接的

from socket import *udp_server=socket(AF_INET,SOCK_DGRAM)udp_server.bind(('127.0.0.1',8080))while True:    data,client_addr=udp_server.recvfrom(1024)    print(data,client_addr)    udp_server.sendto(data.upper(),client_addr)
UDP服务端
from socket import *udp_client=socket(AF_INET,SOCK_DGRAM)while True:    msg=input('>>: ').strip()    udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080))    data,server_addr=udp_client.recvfrom(1024)    print(data.decode('utf-8'))
UDP客户端

基于UDP的套接字不会发生粘包现象

from socket import *udp_server=socket(AF_INET,SOCK_DGRAM)udp_server.bind(('127.0.0.1',8080))data1,client_addr=udp_server.recvfrom(3)print('data1',data1)data2,client_addr=udp_server.recvfrom(1024)print('data2',data2)
UDP服务端
from socket import *udp_client=socket(AF_INET,SOCK_DGRAM)udp_client.sendto('hello'.encode('utf-8'),('127.0.0.1',8080))udp_client.sendto('world'.encode('utf-8'),('127.0.0.1',8080))
UDP客户端

并发的UDP套接字

#UDP服务端 import socketserverclass MyUDPhandler(socketserver.BaseRequestHandler):    def handle(self):        print(self.request)        self.request[1].sendto(self.request[0].upper(),self.client_address)if __name__ == '__main__':    s=socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyUDPhandler)    s.serve_forever()
#UDP客户端from socket import *udp_client=socket(AF_INET,SOCK_DGRAM)while True:    msg=input('>>: ').strip()    udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080))    data,server_addr=udp_client.recvfrom(1024)    print(data.decode('utf-8'))

2、进程理论知识

进程是对正在运行程序的一个抽象。#一 操作系统的作用:    1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口    2:管理、调度进程,并且将多个进程对硬件的竞争变得有序进程与程序的区别:程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。注:同一个程序执行两次,那也是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,一个可以播放饭岛爱。同步执行:一个进程在执行某个任务时,另外一个进程必须等待其执行完毕,才能继续执行异步执行:一个进程在执行某个任务时,另外一个进程无需等待其执行完毕,就可以继续执行,当有消息返回时,系统会通知后者进行处理,这样可以提高执行效率
#开启进程的方式一from multiprocessing import Processimport timedef work(name):    print('task <%s> is runing' %name)    time.sleep(0.5)    print('task <%s> is done' % name)if __name__ == '__main__':      #windows系统开启子进程一定要写在main函数下。    # Process(target=work,kwargs={'name':'egon'})    p1=Process(target=work,args=('egon',))  #一定要加,表示此为元组    p2=Process(target=work,args=('alex',))    p1.start()    p2.start()    print('主')
#join方法 待子进程运行完后主进程开始运行from multiprocessing import Processimport timedef work(name):    print('task <%s> is runing' %name)    time.sleep(0.5)    print('task <%s> is done' % name)if __name__ == '__main__':    p1=Process(target=work,args=('egon',))    p2=Process(target=work,args=('alex',))    p3=Process(target=work,args=('yuanhao',))    p_l = [p1, p2, p3]    for p in p_l:        p.start()    for p in p_l:        p.join()    # p1.join() #主进程等,等待p1运行结束    # p2.join() #主进程等,等待p2运行结束    # p3.join() #主进程等,等待p3运行结束    print('主')
#开启子进程的方式二from multiprocessing import Processimport timeclass MyProcess(Process):    def __init__(self,name):        super().__init__()        self.name=name    def run(self):        print('task <%s> is runing' % self.name)        time.sleep(0.5)        print('task <%s> is done' % self.name)if __name__ == '__main__':    p=MyProcess('egon')    p.start()    print('主')
#并发的套接字通讯#服务端from multiprocessing import Processfrom socket import *s=socket(AF_INET,SOCK_STREAM)s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)s.bind(('127.0.0.1',8080))s.listen(5)def talK(conn,addr):    while True:        try:            data=conn.recv(1024)            if not data:break            conn.send(data.upper())        except Exception:            break    conn.close()if __name__ == '__main__':    while True:        conn,addr=s.accept()        p=Process(target=talK,args=(conn,addr))   #链接循环使用开启通讯循环子进程方式        p.start()    s.close()#客户端from socket import *c=socket(AF_INET,SOCK_STREAM)c.connect(('127.0.0.1',8080))while True:    msg=input('>>: ').strip()    if not msg:continue    c.send(msg.encode('utf-8'))    data=c.recv(1024)    print(data.decode('utf-8'))c.close()
#Process对象的其他方法和属性from multiprocessing import Processimport time,osdef work():    print('parent:%s task <%s> is runing' %(os.getppid(),os.getpid()))    time.sleep(1)    print('parent:%s task <%s> is done'  %(os.getppid(),os.getpid()))if __name__ == '__main__':p1=Process(target=work,args=('egon',),name='123123')  #指定进程名p1.start()p1.terminate() #强制中止进程 如果p1有子进程,会出现僵尸进程p1.is_alive()    #True os过了一段时间才能回收p1进程p1.name #进程名p1.pid     #进程号os.getpid() #当前进程的pidos.getppid  #父进程的pidwindows cmd tasklist|findstr python(pycharm)

 守护进程daemon

#守护进程daemon #其一:守护进程会在主进程代码执行结束后就终止 #其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children from multiprocessing import Processimport timedef work(name):    print('task <%s> is runing' %name)    time.sleep(0.5)    print('task <%s> is done' % name)if __name__ == '__main__':    p1=Process(target=work,args=('egon',))    p1.daemon = True  #子进程start之前必须要要指定daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行    p1.start()    print('主')#主进程代码运行完毕,守护进程就会结束from multiprocessing import Processimport timedef foo():    print(123)    time.sleep(1)    print("end123")def bar():    print(456)    time.sleep(3)    print("end456")if __name__ == '__main__':    p1=Process(target=foo)    p2=Process(target=bar)    p1.daemon=True    p1.start()    p2.start()    print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止

同步锁mutex

#竞争带来的结果就是错乱,如何控制,就是加锁处理 #同步锁mutexfrom multiprocessing import Process,Lockimport timedef work(name,mutex):    mutex.acquire()    #加锁    print('task <%s> is runing' %name)    time.sleep(2)    print('task <%s> is done' % name)    mutex.release()     #解锁if __name__ == '__main__':    mutex=Lock()    p1=Process(target=work,args=('egon',mutex))    #参数要指定mutex    p2=Process(target=work,args=('alex',mutex))    p1.start()    p2.start()    print('主') 加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 虽然可以用文件共享数据实现进程间通信,但问题是: 1.效率低 2.需要自己加锁处理

模拟抢票

#db.txt {"count": 1}  #序列化需要用双引号import jsonimport osimport timefrom multiprocessing import Process,Lockdef search():    dic=json.load(open('db.txt'))    print('\033[32m[%s] 看到剩余票数<%s>\033[0m' %(os.getpid(),dic['count']))def get_ticket():    dic = json.load(open('db.txt'))    time.sleep(0.5) #模拟读数据库的网络延迟    if dic['count'] > 0:        dic['count']-=1        time.sleep(0.5)  # 模拟写数据库的网络延迟        json.dump(dic,open('db.txt','w'))        print('\033[31m%s 购票成功\033[0m' %os.getpid())def task(mutex):    search()    mutex.acquire()    get_ticket()    mutex.release()if __name__ == '__main__':    mutex=Lock()    for i in range(10):        p=Process(target=task,args=(mutex,))        p.start()

共享数据

from multiprocessing import Process,Manager,Lockdef task(dic,mutex):    with mutex:        dic['count']-=1if __name__ == '__main__':    mutex=Lock()    m=Manager()    dic=m.dict({
'count':100}) p_l=[] for i in range(10): p=Process(target=task,args=(dic,mutex)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)

队列

#为此mutiprocessing模块为我们提供了基于消息的IPC通信机制:队列和管道。 1 队列和管道都是将数据存放于内存中 2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。 from multiprocessing import Queueq=Queue(3)q.put('first')q.put('second')q.put('third')# q.put('fourth')   #满了会一直卡在这print(q.get())print(q.get())print(q.get())# print(q.get())       #空的话会一直卡在这#了解q=Queue(3)q.put('first',block=False)q.put('second',block=False)q.put('third',block=False)# q.put_nowait('fourth') == #q.put('fourth',block=False)   #满了不会卡在这,会抛出一个异常q.put('fourth',timeout=3)   #指定超时时间

生产者消费者模型

from multiprocessing import Process,Queueimport time,osdef producer(q,name):    for i in range(3):        time.sleep(1)        res='%s%s' %(name,i)        q.put(res)        print('\033[45m<%s> 生产了 [%s]\033[0m' %(os.getpid(),res))def consumer(q):    while True:        res=q.get()        if res is None:break        time.sleep(1.5)        print('\033[34m<%s> 吃了 [%s]\033[0m' % (os.getpid(), res))if __name__ == '__main__':    q=Queue()    #生产者们:即厨师们    p1=Process(target=producer,args=(q,'包子'))    p2=Process(target=producer,args=(q,'饺子'))    p3=Process(target=producer,args=(q,'馄饨'))    #消费者们:即吃货们    c1=Process(target=consumer,args=(q,))    c2=Process(target=consumer,args=(q,))    p1.start()    p2.start()    p3.start()    c1.start()    c2.start()    p1.join()    p2.join()    p3.join()   #待生产者们运行完毕    q.put(None) #队列中放入结束指定符,几个消费者就放入几个None    q.put(None)    print('主')

Joinable生产者消费者模型

#Joinablequeue#消费者发消息给生产者from multiprocessing import Process, JoinableQueueimport time, osdef producer(q, name):    for i in range(3):        time.sleep(1)        res = '%s%s' % (name, i)        q.put(res)        print('\033[45m<%s> 生产了 [%s]\033[0m' % (os.getpid(), res))    q.join()     #待进程运行完毕def consumer(q):    while True:        res = q.get()        time.sleep(1.5)        print('\033[34m<%s> 吃了 [%s]\033[0m' % (os.getpid(), res))        q.task_done()    #消费者发消息给生产者if __name__ == '__main__':    q = JoinableQueue()    # 生产者们:即厨师们    p1 = Process(target=producer, args=(q, '包子'))    p2 = Process(target=producer, args=(q, '饺子'))    p3 = Process(target=producer, args=(q, '馄饨'))    # 消费者们:即吃货们    c1 = Process(target=consumer, args=(q,))    c2 = Process(target=consumer, args=(q,))    c1.daemon=True  #消费者为守护进程       c2.daemon=True     p1.start()    p2.start()    p3.start()    c1.start()    c2.start()    p1.join()    print('主')

 进程池

创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程 #进程池Poolfrom multiprocessing import Poolimport os,timedef work(n):    print('task <%s> is runing' %os.getpid())    time.sleep(2)    return n**2if __name__ == '__main__':    # print(os.cpu_count()) #CPU个数获得方式    p=Pool(4)   #进程个数设置为CPU个数 #要创建的进程数,如果省略,将默认使用cpu_count()的值    res_l=[]    for i in range(10):        res=p.apply_async(work,args=(i,))   #异步方式提交任务        res_l.append(res)
#异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
p.close() #不允许再给进程池加任务 join前必须要执行close,否则程序有问题 p.join() #主进程等待进程池中任务执行结束 for res in res_l: print(res.get()) #从进程池中获得任务执行的结果 ##使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async() p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。     p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

进程池之回调函数

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
#回调函数callback=#进程池异步方式提交任务,进程池结果使用回调函数处理任务执行结果import requests #pip3 install requestsimport os,timefrom multiprocessing import Pooldef get_page(url):    print('<%s> get :%s' %(os.getpid(),url))    respone = requests.get(url)    if respone.status_code == 200:        return {
'url':url,'text':respone.text}def parse_page(dic): print('<%s> parse :%s' %(os.getpid(),dic['url'])) time.sleep(0.5) res='url:%s size:%s\n' %(dic['url'],len(dic['text'])) #模拟解析网页内容 with open('db.txt','a') as f: f.write(res)if __name__ == '__main__': p=Pool(4) urls = [ 'http://www.baidu.com', 'http://www.baidu.com', 'http://www.baidu.com', 'http://www.baidu.com', 'http://www.baidu.com', 'http://www.baidu.com', 'http://www.baidu.com', ] for url in urls: p.apply_async(get_page,args=(url,),callback=parse_page) p.close() p.join() print('主进程pid:',os.getpid())

进程池控制并发的套接字通信

#服务端from multiprocessing import Poolimport osfrom socket import *s=socket(AF_INET,SOCK_STREAM)s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)s.bind(('127.0.0.1',8080))s.listen(5)def talK(conn,addr):    print(os.getpid())    while True:        try:            data=conn.recv(1024)            if not data:break            conn.send(data.upper())        except Exception:            break    conn.close()if __name__ == '__main__':    p=Pool(4)    while True:        conn,addr=s.accept()        p.apply_async(talK,args=(conn,addr))    s.close()#客户端from socket import *c=socket(AF_INET,SOCK_STREAM)c.connect(('127.0.0.1',8080))while True:    msg=input('>>: ').strip()    if not msg:continue    c.send(msg.encode('utf-8'))    data=c.recv(1024)    print(data.decode('utf-8'))c.close()

paramiko模块

 
paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作 pip3 install paramiko #在python3中
#用户名密码方式远程连接服务器执行命令获取结果import paramiko# 创建SSH对象ssh = paramiko.SSHClient()# 允许连接不在know_hosts文件中的主机ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())# 连接服务器ssh.connect(hostname='120.92.84.249', port=22, username='root', password='123QWEasd')# 执行命令stdin, stdout, stderr = ssh.exec_command('df')# 获取命令结果result = stdout.read()print(result.decode('utf-8'))# 关闭连接ssh.close()#公私钥方式远程连接服务器执行命令获取结果# import paramikoprivate_key = paramiko.RSAKey.from_private_key_file('id_rsa')# 创建SSH对象ssh = paramiko.SSHClient()# 允许连接不在know_hosts文件中的主机ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())# 连接服务器ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key)# 执行命令stdin, stdout, stderr = ssh.exec_command('df')# 获取命令结果result = stdout.read()print(result.decode('utf-8'))# 关闭连接ssh.close()#paramiko利用sftp上传下载文件import paramikotransport = paramiko.Transport(('120.92.84.249', 22))transport.connect(username='root', password='123QWEasd')sftp = paramiko.SFTPClient.from_transport(transport)# 将location.py 上传至服务器 /tmp/test.pysftp.put('id_rsa', '/tmp/test.rsa')# 将remove_path 下载到本地 local_path# sftp.get('remove_path', 'local_path')transport.close()

 

转载于:https://www.cnblogs.com/liweijing/p/7443031.html

你可能感兴趣的文章
[bzoj2131]免费的馅饼 树状数组优化dp
查看>>
CreateMutex()参数报错问题
查看>>
Linux三剑客-常用命令
查看>>
Excel的列数以数字格式查看
查看>>
unity 2d 和 NGUI layer
查看>>
Sublime Text shift+ctrl妙用、Sublime Text快捷组合键大全
查看>>
spring security中当前用户信息
查看>>
[中国寒龙出品]VB程序设计视频第十四课,更多请关注我们的官博。
查看>>
LinuxMint 17.1 Cinnamon桌面窗口焦点bug
查看>>
PHP函数
查看>>
缩点 CF893C Rumor
查看>>
Spring详解篇之 AOP面向切面编程
查看>>
COMP0037 Coursework
查看>>
Java第三次作业
查看>>
数据库的安装步骤
查看>>
关于一些基础的Java问题的解答(七)
查看>>
迭代器模式
查看>>
python写简单的图形界面汉诺塔解题器
查看>>
Wabacus框架
查看>>
安卓titlebar的组合控件使用
查看>>