CDA数据分析师 出品
相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下Python的计算时间,减少大家在算法上的等待时间。以下给大家讲解关于数据并行化这方面的内容。
随着时间和处理器计算能力的增长,数据呈指数级增长,我们需要找到有效地处理数据的方法。那我们应该怎么办呢?
GPU是一种非常有效的解决方案。但是,GPU并不是为了机器学习而设计的,它是专门为复杂的图像处理和游戏而设计的。我们使算法能够在现有GPU上运行,并且确实取得了成果。现在,谷歌推出了一种名为TPU(张量处理单元)的新设备,该设备专门针对TensorFlow上的机器学习工作而量身定做的,其结果确实令人激动。同时英伟达在这方面也并没有退缩。
但是我们将来会在某个时候达到顶峰。即使我们我们现在拥有大量可用的数据集,但是单台机器或计算单元也不足以处理这样的负载。我们将不得不使用多台机器来完成我们的任务。我们将不得不并行化完成我们的任务。
接下来,我们将研究大多数情况下你将在Python中使用的一些方法。然后再介绍一下Dask和torch.multiprocessing。
Python库的Pool和Process方法都来自于multiprocessing它为我们的任务启动了一个新的过程,但是方式有所不同。Process每次调用仅执行一个进程:
import multiprocessing as mpp = mp.Process(target= ##目标函数, args= ##参数到函数)# 此调用将只生产一个进程,该进程将处理在后台使用给定的参数处理目标函数
但是这个过程还没有开始。要启动它,你必须执行以下操作:
p.start
现在,你可以将其保留在此处,或者通过以下方式检查该过程是否完成:
p.join#现在它将等待进程完成。
不检查过程是否已完成有许多用途。例如,在客户端-服务器应用程序中,数据包丢失的可能性或无响应进程的可能性确实很低,我们可以忽略它,这可以使我们的速度大大提高。[取决于申请程序]
对于多个进程,你必须创建多个Process。你想做多少就可以做多少。当你调用.start它们时,它们全部都将会启动。
processes =[mp.Process(target=func, args=(a, b)) for (a, b) in list]for p in processes: p.startfor p in processes: p.join
另一方面, Pool启动固定数量的进程,然后我们可以为这些进程分配一些任务。因此,在特定的时间实例中,只有固定数量的进程将在运行,其余的将在等待状态中。进程的数量通常被选作设备的内核数,如果此参数为空,也是可以作为默认的状态的。
pool = mp.Pool(processes=2)
现在有许多方法可以应用在Pool。在Data Science中,我们可以避免使用的是Pool.apply和Pool.map,因为它们会在任务完成后立即返回结果。Pool.apply仅采用一个参数,并且仅使用一个过程,而Pool.map将接受许多参数,并将其放入我们Pool的过程中。
results = [pool.apply(func, (x)) for x in X]# 或者 results = pool.map(func, (arg)) # 仅需要一个参数
考虑到我们前面的客户端-服务器应用程序的例子,此处预定义了要运行的最大进程数,因此,如果我们有很多请求/数据包,则n(仅在Pool中的最大进程)将运行一次,而其他将在等待其中一个进程插槽的队列中排队。
向量的所有元素的平方
# 我们如何使用数据框# A: 你可以使用一些可以并行化的函数df.shape# (100, 100)dfs = [df.iloc[i*25:i*25+25, 0] for i in range(4)]with Pool(4) as p: res = p.map(np.exp, dfs)for i in range(4): df.iloc[i*25:i*25+25, 0] = res[i]# 它可以方便的对数据进行预处理
什么时候使用什么?
如果你有很多任务,但其中很少的任务是计算密集型的,则应使用Process。因为如果它们需要大量计算,它们可能会阻塞你的CPU,并且你的系统可能会崩溃。如果你的系统可以一次处理所有这些操作,那么他们就不必在队列中等待机会了。
并且当你的任务数量固定且它们的计算量很大时,应使用Pool。因为你同时释放他们,那么你的系统很可能会崩溃。
什么!线程处理在python中进行?
python中的线程声誉。人们的这一点看法是对的。实际上,线程在大多数情况下是不起作用的。那么问题到底是什么呢?
问题就出在GIL(全局解释器锁定)上。GIL是在Python的开发初期就引入的,当时甚至在操作系统中都没有线程的概念。选择它是因为它的简单性。
GIL一次仅允许一个CPU进程。也就是说,它一次仅允许一个线程访问python解释器。因此,一个线程将整个解释器Lock,直到它完成。
对于单线程程序,它非常快,因为只有一个Lock要维护。随着python的流行,有效地推出GIL而不损害所有相关应用程序变得越来越困难。这就是为什么它仍然存在的原因。
但是,如果你的任务不受CPU限制,则仍然可以使用多线程并行(y)。也就是说,如果你的任务受I / O约束,则可以使用多个线程并获得加速。因为大多数时候这些任务都在等待其他代理(例如磁盘等)的响应,并且在这段时间内它们可以释放锁,而让其他任务同时获取它。⁴
NOTE: (来自于官方网页)The GIL is controversial because it prevents multithreaded CPython programs from taking full advantage of multiprocessor systems in certain situations. Note that potentially blocking or long-running operations, such as I/O, image processing, and NumPy number crunching, happen outside the GIL. Therefore it is only in multithreaded programs that spend a lot of time inside the GIL, interpreting CPython bytecode, that the GIL becomes a bottleneck.
以下是对官方网页的解释:
GIL是有争议的,因为它阻止多线程CPython程序在某些情况下充分利用多处理器系统。注意,潜在的阻塞或长时间运行的操作,如I/O、图像处理和NumPy数字处理,都发生在GIL之外。因此,只有在花费大量时间在GIL内部解释CPython字节码的多线程程序中,GIL才会成为瓶颈。
因此,如果你的任务受IO限制,例如从服务器下载一些数据,对磁盘进行读/写等操作,则可以使用多个线程并获得加速。
from threading import Thread as timport queueq = queue.Queue # 用于放置和获取线程的结果func_ = lambda q, args: q.put(func(args))threads = [t(target=func_, args=(q, args)) for args in args_array]for t in threads: t.startfor t in threads: t.joinres = for t in threads: res.append(q.get) # 这些结果不一定是按顺序排列的
要保存线程的结果,可以使用类似于Queue 的方法。为此,你将必须如上所示定义函数,或者可以在函数内部使用Queue.put,但是为此,你必须更改函数定义以Queue`做为参数。
现在,你在队列中的结果不一定是按顺序排列的。如果希望结果按顺序排列,则可以传入一些计数器作为参数,如id作为参数,然后使用这些id来标识结果的来源。
threads = [t(func_, args = (i, q, args)) for i, args in enumerate(args_array)]# 并相应地更新函数NOTE:在pandas中的多处理中由于某些原因 'read.csv' 的方法并没有提供太多的加速,你可以考虑使用Dask做为替代
线程还是进程?
一个进程是重量级的,因为它可能包含许多自己的线程(包含至少一个线程),并且分配了自己的内存空间,而线程是轻量级的,因为它在父进程的内存区域上工作,因此制作起来更快。
进程内的线程之间的通信比较容易,因为它们共享相同的内存空间。而进程间的通信(IPC-进程间通信)则比较慢。但是,共享相同数据的线程又可能进入竞争状态,应谨慎使用Locks或使用类似的解决方案。
Dask是一个并行计算库,它不仅有助于并行化现有的机器学习工具(Pandas和Numpy)(即使用高级集合),而且还有助于并行化低级任务/功能,并且可以通过制作任务图来处理这些功能之间的复杂交互。[ 即使用低级调度程序 ]这类似于Python的线程或多处理模块。
他们也有一个单独的机器学习库dask-ml,这与如现有的库(如sklearn,xgboost和tensorflow)集成在一起。
from dask import delayed as delay@delaydef add(x, y): return x+y@delaydef sq(x): return x**2# 现在你可以以任何方式使用这些函数,Dask将使你的执行并行化。顾名思义,Dask不会立即执行函数调用,而是根据对输入和中间结果调用函数的方式生成计算图。计算最终结果:result.compute
Dask在做任何事情的时候都有一种内在的并行性。对于如何处理DataFrame的,你可以将其视为分而治之的方法,它将DataFrame分为多个块,然后并行应用给定的函数。
df = dask.DataFrame.read_csv("BigFile.csv", chunks=50000)# 你的DataFrame已经被划分为了多个块,你应用的每个函数将分别并行的应用所有的模块。它有大部分的Pandas功能,你可以使用:agg = df.groupby(["column"]).aggregate(["sum", "mean"])agg.columns = new_column_namesdf_new = df.merge(agg.reset_index, on="column", how="left")# 虽然到目前为止还没有计算结果,但是使用.compute可以并行计算。df_new.compute.head
它们还具有用于在计算机集群上运行它们的接口。
torch.multiprocessing是Python multiprocessing模块的封装函数,其API与原始模块100%兼容。因此,你可以在此处使用Python的 multiprocessing模块中的Queue',Pipe',Array'等。此外,为了使其更快,他们添加了一个方法,share_memory_该方法允许数据进入一个状态,在这个状态下任何进程都可以直接使用它,因此将该数据作为参数传递给不同的进程不会复制该数据。 。
你可以共享Tensors,模型的parameters,也可以根据需要在CPU或GPU上共享它们。
来自Pytorch的警告:(关于GPU上的共享) CUDA API要求导出到其他进程的分配在被其他进程使用时仍然有效。你应该小心,确保你共享的CUDA张量不会超出范围,只要有必要。这对于共享模型参数应该不是问题,但是传递其他类型的数据时应该小心。注意,这个限制不适用于共享CPU内存。
你可以在此处的"Pool and Process"部分中使用上面的方法,并且要获得更快的速度,可以使用share_memory_方法在所有进程之间共享一个Tensor(例如)而不被需要复制。
# 使用多个过程训练一个模型:import torch.multiprocessing as mpdef train(model): for data, labels in data_loader: optimizer.zero_grad loss_fn(model(data), labels).backward optimizer.step # 这将更新共享参数model = nn.Sequential(nn.Linear(n_in, n_h1), nn.ReLU, nn.Linear(n_h1, n_out))model.share_memory #需要"fork"方法工作processes = for i in range(4): # NO.的过程 p = mp.Process(target=train, args=(model,)) p.start processes.append(p)for p in processes: p.join
下一期继续看加快Python算法的第4种方法——Dask!
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
CDA持证人简介 刘伟,美国 NAU 大学计算机信息技术硕士, CDA数据分析师三级持证人,现任职于江苏宝应农商银行数据治理岗。 学 ...
2025-04-21持证人简介:贺渲雯 ,CDA 数据分析师一级持证人,互联网行业数据分析师 今天我将为大家带来一个关于用户私域用户质量数据分析 ...
2025-04-18一、CDA持证人介绍 在数字化浪潮席卷商业领域的当下,数据分析已成为企业发展的关键驱动力。为助力大家深入了解数据分析在电商行 ...
2025-04-17CDA持证人简介:居瑜 ,CDA一级持证人,国企财务经理,13年财务管理运营经验,在数据分析实践方面积累了丰富的行业经验。 一、 ...
2025-04-16持证人简介: CDA持证人刘凌峰,CDA L1持证人,微软认证讲师(MCT)金山办公最有价值专家(KVP),工信部高级项目管理师,拥有 ...
2025-04-15持证人简介:CDA持证人黄葛英,ICF国际教练联盟认证教练,前字节跳动销售主管,拥有丰富的行业经验。在实际生活中,我们可能会 ...
2025-04-14在 Python 编程学习与实践中,Anaconda 是一款极为重要的工具。它作为一个开源的 Python 发行版本,集成了众多常用的科学计算库 ...
2025-04-14随着大数据时代的深入发展,数据运营成为企业不可或缺的岗位之一。这个职位的核心是通过收集、整理和分析数据,帮助企业做出科 ...
2025-04-11持证人简介:CDA持证人黄葛英,ICF国际教练联盟认证教练,前字节跳动销售主管,拥有丰富的行业经验。 本次分享我将以教培行业为 ...
2025-04-11近日《2025中国城市长租市场发展蓝皮书》(下称《蓝皮书》)正式发布。《蓝皮书》指出,当前我国城市住房正经历从“增量扩张”向 ...
2025-04-10在数字化时代的浪潮中,数据已经成为企业决策和运营的核心。每一位客户,每一次交易,都承载着丰富的信息和价值。 如何在海量客 ...
2025-04-09数据是数字化的基础。随着工业4.0的推进,企业生产运作过程中的在线数据变得更加丰富;而互联网、新零售等C端应用的丰富多彩,产 ...
2025-04-094月7日,美国关税政策对全球金融市场的冲击仍在肆虐,周一亚市早盘,美股股指、原油期货、加密货币、贵金属等资产齐齐重挫,市场 ...
2025-04-08背景 3月26日,科技圈迎来一则重磅消息,苹果公司宣布向浙江大学捐赠 3000 万元人民币,用于支持编程教育。 这一举措并非偶然, ...
2025-04-07在当今数据驱动的时代,数据分析能力备受青睐,数据分析能力频繁出现在岗位需求的描述中,不分岗位的任职要求中,会特意标出“熟 ...
2025-04-03在当今数字化时代,数据分析师的重要性与日俱增。但许多人在踏上这条职业道路时,往往充满疑惑: 如何成为一名数据分析师?成为 ...
2025-04-02最近我发现一个绝招,用DeepSeek AI处理Excel数据简直太爽了!处理速度嘎嘎快! 平常一整天的表格处理工作,现在只要三步就能搞 ...
2025-04-01你是否被统计学复杂的理论和晦涩的公式劝退过?别担心,“山有木兮:统计学极简入门(Python)” 将为你一一化解这些难题。课程 ...
2025-03-31在电商、零售、甚至内容付费业务中,你真的了解你的客户吗? 有些客户下了一两次单就消失了,有些人每个月都回购,有些人曾经是 ...
2025-03-31在数字化浪潮中,数据驱动决策已成为企业发展的核心竞争力,数据分析人才的需求持续飙升。世界经济论坛发布的《未来就业报告》, ...
2025-03-28