如何让一个python 多核的脚本跑满多核的CPU

Python 并发编程之使用多线程和多处理器 - 技术翻译 - 开源中国社区
Python 并发编程之使用多线程和多处理器
【已翻译100%】
英文原文:
推荐于 3年前 (共 9 段, 翻译完成于 05-07)
参与翻译&(6人)&: ,
在Python编码中我们经常讨论的一个方面就是如何优化模拟执行的性能。尽管在考虑量化代码时NumPy、SciPy和pandas在这方面已然非常有用,但在构建事件驱动系统时我们无法有效地使用这些工具。有没有可以加速我们代码的其他办法?答案是肯定的,但需要留意!
在这篇文章中,我们看一种不同的模型-并发,我们可以将它引入我们Python程序中。这种模型在模拟中工作地特别好,它不需要共享状态。Monte Carlo模拟器可以用来做期权定价以及检验算法交易等类型的各种参数的模拟。
我们将特别考虑库和库。
&翻译得不错哦!
Python并发
当Python初学者探索多线程的代码为了计算密集型优化时,问得最多的问题之一是:”当我用多线程的时候,为什么我的程序变慢了?“
在多核机器上,我们期望多线程的代码使用额外的核,从而提高整体性能。不幸的是,主Python解释器(CPython)的内部并不是真正的多线程,是通过一个全局解释锁(GIL)来进行处理的。
GIL是必须的,因为Python解释器是非线程安全的。这意味着当从线程内尝试安全的访问Python对象的时候将有一个全局的强制锁。在任何时候,仅仅一个单一的线程能够获取Python对象或者C API。每100个字节的Python指令解释器将重新获取锁,这(潜在的)阻塞了I/0操作。因为锁,CPU密集型的代码使用线程库时,不会获得性能的提高,但是当它使用多处理库时,性能可以获得提高。
&翻译得不错哦!
并行库的实现
现在,我们将使用上面所提到的两个库来实现对一个“小”问题进行并发优化。
上面我们提到: 运行CPython解释器的Python不会支持通过多线程来实现多核处理。不过,Python确实有一个。那么如果我们(可能)不能使用多个核心进行处理,那么使用这个库能取得什么好处呢?
许多程序,尤其是那些与网络通信或者数据输入/输出(I/O)相关的程序,都经常受到网络性能或者输入/输出(I/O)性能的限制。这样Python解释器就会等待哪些从诸如网络地址或者硬盘等“远端”数据源读写数据的函数调用返回。因此这样的数据访问比从本地内存或者CPU缓冲区读取数据要慢的多。
因此,如果许多数据源都是通过这种方式访问的,那么就有一种方式对这种数据访问进行性能提高,那就是对每个需要访问的数据项都产生一个线程 。
&翻译得不错哦!
举个例子,假设有一段Python代码,它用来对许多站点的URL进行扒取。再假定下载每个URL所需时间远远超过计算机CPU对它的处理时间,那么仅使用一个线程来实现就会大大地受到输入/输出(I/O)性能限制。
通过给每个下载资源生成一个新的线程,这段代码就会并行地对多个数据源进行下载,在所有下载都结束的时候再对结果进行组合。这就意味着每个后续下载都不会等待前一个网页下载完成。此时,这段代码就受收到客户/服务端带宽的限制。
不过,许多与财务相关的应用都受到CPU性能的限制,这是因为这样的应用都是高度集中式的对数字进行处理。这样的应用都会进行大型线性代数计算或者数值的随机统计,比如进行蒙地卡罗模拟统计。所以只要对这样的应用使用Python和全局解释锁(GIL),此时使用Python线程库就不会有任何性能的提高。
&翻译得不错哦!
Python实现
下面这段依次添加数字到列表的“玩具”代码,举例说明了多线程的实现。每个线程创建一个新的列表并随机添加一些数字到列表中。这个已选的“玩具”例子对CPU的消耗非常高。
下面的代码概述了线程库的接口,但是他不会比我们用单线程实现的速度更快。当我们对下面的代码用多处理库时,我们将看到它会显著的降低总的运行时间。
让我们检查一下代码是怎样工作的。首先我们导入threading库。然后我们创建一个带有三个参数的函数list_append。第一个参数count定义了创建列表的大小。第二个参数id是“工作”(用于我们输出debug信息到控制台)的ID。第三个参数out_list是追加随机数的列表。
&翻译得不错哦!
__main__函数创建了一个107的size,并用两个threads执行工作。然后创建了一个jobs列表,用于存储分离的线程。threading.Thread对象将list_append函数作为参数,并将它附加到jobs列表。
最后,jobs分别开始并分别“joined”。join()方法阻塞了调用的线程(例如主Python解释器线程)直到线程终止。在打印完整的信息到控制台之前,确认所有的线程执行完成。
#&thread_test.pyimport&randomimport&threadingdef&list_append(count,&id,&out_list):
Creates&an&empty&list&and&then&appends&a&
random&number&to&the&list&'count'&number
of&times.&A&CPU-heavy&operation!
for&i&in&range(count):
out_list.append(random.random())if&__name__&==&"__main__":
size&=&&&&#&Number&of&random&numbers&to&add
threads&=&2&&&#&Number&of&threads&to&create
#&Create&a&list&of&jobs&and&then&iterate&through
#&the&number&of&threads&appending&each&thread&to
#&the&job&list&
for&i&in&range(0,&threads):
out_list&=&list()
thread&=&threading.Thread(target=list_append(size,&i,&out_list))
jobs.append(thread)
#&Start&the&threads&(i.e.&calculate&the&random&number&lists)
for&j&in&jobs:
#&Ensure&all&of&the&threads&have&finished
for&j&in&jobs:
print&"List&processing&complete."
我们能在控制台中调用如下的命令time这段代码
time&python&thread_test.py
将产生如下的输出
List&processing&complete.
real&&&&0m2.003s
user&&&&0m1.838s
sys&&&&&0m0.161s
注意user时间和sys时间相加大致等于real时间。这表明我们使用线程库没有获得性能的提升。我们期待real时间显著的降低。在并发编程的这些概念中分别被称为CPU时间和挂钟时间(wall-clock time)
&翻译得不错哦!
多进程处理库
为了充分地使用所有现代处理器所能提供的多个核心 ,我们就要使用 。它的工作方式与线程库完全不同 ,不过两种库的语法却非常相似 。
多进程处理库事实上对每个并行任务都会生成多个操作系统进程。通过给每个进程赋予单独的Python解释器和单独的全局解释锁(GIL)十分巧妙地规避了一个全局解释锁所带来的问题。而且每个进程还可独自占有一个处理器核心,在所有进程处理都结束的时候再对结果进行重组。
不过也存在一些缺陷。生成许多进程就会带来很多I/O管理问题,这是因为多个处理器对数据的处理会引起数据混乱 。这就会导致整个运行时间增多 。不过,假设把数据限制在每个进程内部 ,那么就可能大大的提高性能 。当然,再怎么提高也不会超过所规定的极限值。
&翻译得不错哦!
Python实现
使用Multiprocessing实现仅仅需要修改导入行和multiprocessing.Process行。这里单独的向目标函数传参数。除了这些,代码几乎和使用Threading实现的一样:
#&multiproc_test.pyimport&randomimport&multiprocessingdef&list_append(count,&id,&out_list):
Creates&an&empty&list&and&then&appends&a&
random&number&to&the&list&'count'&number
of&times.&A&CPU-heavy&operation!
for&i&in&range(count):
out_list.append(random.random())if&__name__&==&"__main__":
size&=&&&&#&Number&of&random&numbers&to&add
procs&=&2&&&#&Number&of&processes&to&create
#&Create&a&list&of&jobs&and&then&iterate&through
#&the&number&of&processes&appending&each&process&to
#&the&job&list&
for&i&in&range(0,&procs):
out_list&=&list()
process&=&multiprocessing.Process(target=list_append,&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&args=(size,&i,&out_list))
jobs.append(process)
#&Start&the&processes&(i.e.&calculate&the&random&number&lists)
for&j&in&jobs:
#&Ensure&all&of&the&processes&have&finished
for&j&in&jobs:
print&"List&processing&complete."
控制台测试运行时间:
time&python&multiproc_test.py
得到如下输出:
List&processing&complete.
real&&&&0m1.045s
user&&&&0m1.824s
sys&&&&&0m0.231s
&翻译得不错哦!
在这个例子中可以看到user和sys时间基本相同,而real下降了近两倍。之所以会这样是因为我们使用了两个进程。扩展到四个进程或者将列表的长度减半结果如下(假设你的电脑至少是四核的):
List&processing&complete.
real&&&&0m0.540s
user&&&&0m1.792s
sys&&&&&0m0.269s
使用四个进程差不多提高了3.8倍速度。但是,在将这个规律推广到更大范围,更复杂的程序上时要小心。数据转换,硬件cacha层次以及其他一些问题会减弱加快的速度。
在下一篇文章中我们会将Event-Driben Basketer并行化,从而提高其运行多维参数寻优的能力。
相关阅读:
&翻译得不错哦!
我们的翻译工作遵照 ,如果我们的工作有侵犯到您的权益,请及时联系我们
js lua有线程吗?
引用来自“Mallon”的评论js lua有线程吗?lua是非线程安全的,js一般都是一个线程跑,依靠异步回调机制并发。
引用来自“Mallon”的评论js lua有线程吗?引用来自“郭煜”的评论lua是非线程安全的,js一般都是一个线程跑,依靠异步回调机制并发。而且这几个语言的使用场合都不一样
引用来自“Mallon”的评论js lua有线程吗?引用来自“郭煜”的评论lua是非线程安全的,js一般都是一个线程跑,依靠异步回调机制并发。HTML5的worker则是多线程模型
引用来自“Mallon”的评论js lua有线程吗?引用来自“郭煜”的评论lua是非线程安全的,js一般都是一个线程跑,依靠异步回调机制并发。引用来自“独爱剑走偏锋”的评论HTML5的worker则是多线程模型HTML5 Worker 不能操作直接操作 DOM?
我觉得还是可以的,只要是系统自带的库,因为这些库是按功能组织的,名字并不是专指某个特定实现。除非是外部的库(特定的实现),才用原名(可能是英文)来写上。
引用来自“Mallon”的评论js lua有线程吗?引用来自“郭煜”的评论lua是非线程安全的,js一般都是一个线程跑,依靠异步回调机制并发。引用来自“独爱剑走偏锋”的评论HTML5的worker则是多线程模型引用来自“yfwz100”的评论HTML5 Worker 不能操作直接操作 DOM?嗯 至少目前不行,,得要交给master线程来完成10730 的BLOG
用户名:10730
文章数:110
评论数:259
访问量:252720
注册日期:
阅读量:5863
阅读量:12276
阅读量:416726
阅读量:1104169
51CTO推荐博文
& & 刚刚学python,根据需求自己动手写了几个监控系统性能的脚本,原理比较简单,都是通过SNMP协议获取系统信息,再进行相应的计算和格式化,最后输出结果。 & & 以下的脚本都是通过os.popen模块调用snmpwalk 命令获取到的MIB 信息,原本想用py-snmp,不过介于网上资源太少,而且官网的例子看不太懂,只能用这种方法替代。后期再尝试用py-snmp来做这些事情。由于这些脚本都是调用snmpwalk命令来获取信息,所以被监控的机器上需要支持snmp,可以执行# yum install -y net-snmp* 下面是几个脚本的代码,第一次写,写的不够规范,也不够简洁,还请大家多指教。监控网卡流量#!/usr/bin/python
#get SNMP-MIB2 of the devices
def getAllitems(host,oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('\n')[:-1]
return sn1
#get network device
def getDevices(host):
device_mib = getAllitems(host,'RFC1213-MIB::ifDescr')
device_list = []
for item in device_mib:
if re.search('eth',item):
device_list.append(item.split(':')[3].strip())
return device_list
#get network date
def getDate(host,oid):
date_mib = getAllitems(host,oid)[1:]
for item in date_mib:
byte = float(item.split(':')[3].strip())
date.append(str(round(byte/1024,2)) + ' KB')
return date
if __name__ == '__main__':
hosts = ['192.168.30.111','192.168.30.112']
for host in hosts:
device_list = getDevices(host)
inside = getDate(host,'IF-MIB::ifInOctets')
outside = getDate(host,'IF-MIB::ifOutOctets')
print '==========' + host + '=========='
for i in range(len(inside)):
print '%s : RX: %-15s
TX: %s ' % (device_list[i], inside[i], outside[i])
print监控内存(swap)使用率#!/usr/bin/python
def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('\n')[:-1]
return sn1
def getSwapTotal(host):
swap_total = getAllitems(host, 'UCD-SNMP-MIB::memTotalSwap.0')[0].split(' ')[3]
return swap_total
def getSwapUsed(host):
swap_avail = getAllitems(host, 'UCD-SNMP-MIB::memAvailSwap.0')[0].split(' ')[3]
swap_total = getSwapTotal(host)
swap_used = str(round(((float(swap_total)-float(swap_avail))/float(swap_total))*100 ,2)) + '%'
return swap_used
def getMemTotal(host):
mem_total = getAllitems(host, 'UCD-SNMP-MIB::memTotalReal.0')[0].split(' ')[3]
return mem_total
def getMemUsed(host):
mem_total = getMemTotal(host)
mem_avail = getAllitems(host, 'UCD-SNMP-MIB::memAvailReal.0')[0].split(' ')[3]
mem_used = str(round(((float(mem_total)-float(mem_avail))/float(mem_total))*100 ,2)) + '%'
return mem_used
if __name__ == '__main__':
hosts = ['192.168.30.111','192.168.30.112']
print "Monitoring Memory Usage"
for host in hosts:
mem_used = getMemUsed(host)
swap_used = getSwapUsed(host)
print '==========' + host + '=========='
print 'Mem_Used = %-15s
Swap_Used = %-15s' % (mem_used, swap_used)
print监控系统负载#!/usr/bin/python
def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('\n')
return sn1
def getload(host,loid):
load_oids = '1.3.6.1.4.1..3.' + str(loid)
return getAllitems(host,load_oids)[0].split(':')[3]
if __name__ == '__main__':
hosts = ['192.168.30.111','192.168.30.112']
#check_system_load
print '==============System Load=============='
for host in hosts:
load1 = getload(host, 1)
load10 = getload(host, 2)
load15 = getload(host, 3)
print '%s load(1min): %s ,load(10min): %s ,load(15min): %s' % (host,load1,load10,load15)监控CPU#!/usr/bin/python
def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid + '|grep Raw|grep Cpu|grep -v Kernel').read().split('\n')[:-1]
return sn1
def getDate(host):
items = getAllitems(host, '.1.3.6.1.4.1.2021.11')
cpu_total = 0
#us = us+ni, sy = sy + irq + sirq
for item in items:
float_item = float(item.split(' ')[3])
cpu_total += float_item
if item == items[0]:
date.append(float(item.split(' ')[3]) + float(items[1].split(' ')[3]))
elif item == item[2]:
date.append(float(item.split(' ')[3] + items[5].split(' ')[3] + items[6].split(' ')[3]))
date.append(float_item)
#calculate cpu usage percentage
for item in date:
rate.append((item/cpu_total)*100)
mean = ['%us','%ni','%sy','%id','%wa','%cpu_irq','%cpu_sIRQ']
#calculate cpu usage percentage
result = map(None,rate,mean)
return result
if __name__ == '__main__':
hosts = ['192.168.30.111','192.168.30.112']
for host in hosts:
print '==========' + host + '=========='
result = getDate(host)
print 'Cpu(s)',
#print result
for i in range(5):
print ' %.2f%s' % (result[i][0],result[i][1]),
print监控磁盘#!/usr/bin/python
def getAllitems(host,oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('\n')[:-1]
return sn1
def getDate(source,newitem):
for item in source[5:]:
newitem.append(item.split(':')[3].strip())
return newitem
def getRealDate(item1,item2,listname):
for i in range(len(item1)):
listname.append(int(item1[i])*int(item2[i])/1024)
return listname
def caculateDiskUsedRate(host):
hrStorageDescr = getAllitems(host, 'HOST-RESOURCES-MIB::hrStorageDescr')
hrStorageUsed = getAllitems(host, 'HOST-RESOURCES-MIB::hrStorageUsed')
hrStorageSize = getAllitems(host, 'HOST-RESOURCES-MIB::hrStorageSize')
hrStorageAllocationUnits = getAllitems(host, 'HOST-RESOURCES-MIB::hrStorageAllocationUnits')
disk_list = []
hrsused = []
hrsize = []
hrsaunits = []
#get disk_list
for item in hrStorageDescr:
if re.search('/',item):
disk_list.append(item.split(':')[3])
#print disk_list
getDate(hrStorageUsed,hrsused)
getDate(hrStorageSize,hrsize)
#print getDate(hrStorageAllocationUnits,hrsaunits)
#get hrstorageAllocationUnits
for item in hrStorageAllocationUnits[5:]:
hrsaunits.append(item.split(':')[3].strip().split(' ')[0])
#caculate the result
#disk_used = hrStorageUsed * hrStorageAllocationUnits /1024 (KB)
disk_used = []
total_size = []
disk_used = getRealDate(hrsused,hrsaunits,disk_used)
total_size = getRealDate(hrsize,hrsaunits,total_size)
diskused_rate = []
for i in range(len(disk_used)):
diskused_rate.append(str(round((float(disk_used[i])/float(total_size[i])*100), 2)) + '%')
return diskused_rate,disk_list
if __name__ == '__main__':
hosts = ['192.168.30.111','192.168.30.112']
for host in hosts:
result = caculateDiskUsedRate(host)
diskused_rate = result[0]
partition = result[1]
print "==========",host,'=========='
for i in range(len(diskused_rate)):
print '%-20s used: %s' % (partition[i],diskused_rate[i])
print执行结果本文出自 “” 博客,请务必保留此出处
了这篇文章
类别:┆阅读(0)┆评论(0)
15:27:43 16:51:13 12:32:05 18:36:09 21:32:13 16:36:45 17:10:50多核cpu怎样实现让一个cpu密集型的程序占用所有cpu?
多线程/多进程 &写个死循环。
RunTime.getRuntime().availableProcessors()获得处理器数量,创建一个这么数量的线程池,这样只能保证你的程序高效运行,但是不能保证占用所有CPU&
java8的fork/join倒是有这方面的优化
linux sched_setaffinity &windows setthreadaffinitmask &绑定进程到某个CPU,然后通过系统调用找到当前系统CPU使用率,动态的调整sleep的时间,编程之美详细的了,,有回复时邮件通知我
Linux--不是那么难,学习Linux--那么难是不?Legion是一个个人的linux学习记录型博客,本人不望博客多出名,只期待自己能把自己在linux路上的点点滴滴都做个记录。只期望自己能很好的坚持写下去。
记住我的登录信息
输入用户名或电子邮箱地址,您会收到一封新密码链接的电子邮件。
用户名或电子邮件地址}

我要回帖

更多关于 python 利用多核cpu 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信