CDA数据分析师 出品
相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下python的计算时间,减少大家在算法上的等待时间。今天给大家讲述最后一方面的内容,关于Dask的方法运用。
随着对机器学习算法并行化的需求不断增加,由于数据大小甚至模型大小呈指数级增长,如果我们拥有一个工具,可以帮助我们并行化处理Pandas的DataFrame,可以并行化处理Numpy的计算,甚至并行化我们的机器学习算法(可能是来自sklearn和Tensorflow的算法)也没有太多的麻烦,那它对我们会非常有帮助。
好消息是确实存在这样的库,其名称为Dask。Dask是一个并行计算库,它不仅有助于并行化现有的机器学习工具(Pandas和Numpy)(即使用高级集合),而且还有助于并行化低级任务/功能,并且可以通过制作任务图来处理这些功能之间的复杂交互。[ 即使用低级调度程序 ]这类似于Python的线程或多处理模块。
他们也有一个单独的机器学习库dask-ml,这与如现有的库集成如sklearn,xgboost和tensorflow。
Dask通过绘制任务之间的交互图来并行化分配给它的任务。使用Dask的.visualize方法来可视化你的工作将非常有帮助,该方法可用于所有数据类型以及你计算的复杂任务链。此方法将输出你的任务图,并且如果你的任务在每个级别具有多个节点(即,你的任务链结构在多个层次上具有许多独立的任务,例如数据块上的并行任务),然后Dask将能够并行化它们。
注意: Dask仍是一个相对较新的项目。它还有很长的路要走。不过,如果你不想学习全新的API(例如PySpark),Dask是你的最佳选择,将来肯定会越来越好。Spark / PySpark仍然遥遥领先,并且仍将继续改进。这是一个完善的Apache项目。
Dask中的每种数据类型都提供现有数据类型的分布式版本,例如pandas中的DataFrame、numpy中的ndarray和Python中的list。这些数据类型可以大于你的内存,Dask将以Blocked方式对数据并行(y)运行计算。Blocked从某种意义上说,它们是通过执行许多小的计算(即,以块为单位)来执行大型计算的,而块的数量为chunks的总数。
a)数组:
网格中的许多Numpy数组作为Dask数组
Dask Array对非常大的数组进行操作,将它们划分为块并并行执行这些块。它有许多可用的numpy方法,你可以使用这些方法来加快速度。但是其中一些没有实现。
只要支持numpy切片,Dask Array就可以从任何类似数组结构中读取数据,并且可以通过使用并且通过使用Dask . Array .from_array方法具有.shape属性。它还可以读取.npy和.zarr文件。
import dask.array as daimport numpy as nparr = numpy.random.randint(1, 1000, (10000, 10000))darr = da.from_array(arr, chunks=(1000, 1000))# 它会生成大小为(1000,1000)的块darr.npartitioins# 100
当你的数组真的很重时(即它们无法放入内存)并且numpy对此无能为力时,可以使用它。因此,Dask将它们分为数组块并为你并行处理它们。
现在,Dask对每种方法进行惰性评估。因此,要实际计算函数的值,必须使用.compute方法。它将以块为单位并行计算结果,同时并行化每个独立任务。
result = darr.compute
1)元素数量较少时,Numpy比Dask快;2)Dask接管了Numpy,耗时约1e7个元素;3)Numpy无法产生更多元素的结果,因为它无法将它们存储在内存中。
b)DataFrame:
5个Pandas DataFrame在一个Dask DataFrame中提供每月数据(可以来自diff文件)
与Dask Arrays相似,Dask DataFrames通过将文件划分为块并将这些块的计算函数并行化,从而对不适合内存非常大的数据文件进行并行计算。
import dask.dataframe as dddf = dd.read_csv("BigFile(s).csv", blocksize=50e6)
现在,你可以应用/使用pandas库中可用的大多数功能,并在此处应用。
agg = df.groupby(["column"]).aggregate(["sum", "mean", "max", "min"])agg.columns = new_column_names #请查看notebookdf_new = df.merge(agg.reset_index, on="column", how="left")df_new.compute.head
c)Bag:
Dask Bag包并行处理包含多个数据类型元素Python的list相似对象。当你尝试处理一些半结构化数据(例如JSON Blob或日志文件)时,此功能很有用。
import dask.bag as dbb = db.from_txt("BigSemiStructuredData.txt")b.take(1)
Daskbag逐行读取,.take方法输出指定行数的元组。
Dask Bag在这样的Python对象集合上实现例如map,filter,fold,和groupby等操作。它使用Python迭代器并行地完成这个任务,占用的内存很小。它类似于PyToolz的并行版本或PySpark RDD的Python版本。
filtered = b.filter(lambda x: x["Name"]=="James")\ .map(lambda x: x["Address"] = "New_Address")filtered.compute
如果你的任务有点简单,并且你不能或不想使用这些高级集合来执行操作,则可以使用低级调度程序,该程序可帮助你使用dask.delayed接口并行化代码/算法。dask.delayed也可以进行延迟计算。
import dask.delayed as delay@delaydef sq(x): return x**2@delay def add(x, y): return x+y@delay def sum(arr): sum=0 for i in range(len(arr)): sum+=arr[i] return sum
你可以根据需要在这些函数之间添加复杂的交互,使用上一个任务的结果作为下一个任务的参数。Dask不会立即计算这些函数,而是会为你的任务绘制图形,有效地合并你使用的函数之间的交互。
inputs = list(np.arange(1, 11))#将外接程序 dask.delayed 加入到列表temp = for i in range(len(inputs)): temp.append(sq(inputs[i])) # 计算输入的sq并保存 # 延迟计算在列表inputs=temp; temp = for i in range(0, len(inputs)-1, 2): temp.append(add(inputs[i]+inputs[i+1])) # 添加两个连续 # 结果从prev步骤inputs = tempresult = sum(inputs) # 将所有prev步骤的结果相加results.compute
你可以将延迟添加到具有许多可能的小块的任何可并行化代码中,从而获得加速的效果。它可以是你想计算的许多函数,例如上面的示例,或者可以使用并行读取多个文件pandas.read_csv。
首先,到目前为止,我们一直使用Dask的默认调度器来计算任务的结果。但是你可以根据需要从Dask提供的选项中更改它们。
Dask 带有四个可用的调度程序:
· threaded:由线程池支持的调度程序
· processes:由进程池支持的调度程序
· single-threaded(又名" sync"):同步调度程序,用于调试
· distributed:用于在多台计算机上执行图形的分布式调度程序
result.compute(scheduler="single-threaded") #用于调试# 或者dask.config.set(scheduler="single-threaded")result.compute#注:(从官方网页)#当被称为GIL的函数释放时,线程任务将工作得很好,而多处理总是启动时间较慢,并且在任务之间需要大量的通信。# 你可以通过其中一个得到调度程序 commands:dask.threaded.get, dask.multiprocessing.get, dask.local.get_sync#单线程的最后一个
但是,Dask还有一个调度器,dask.distributed由于以下原因它可能是首选使用的:
1. 它提供了异步API的访问,尤其是Future,
1. 它提供了一个诊断仪表板,可以提供有关性能和进度的宝贵见解
1. 它可以更复杂地处理数据位置,因此在需要多个流程的工作负载上,它比多处理调度程序更有效。
你可以创建一个Dask的dask.distributed调度程序,通过导入和创建客户端实现分布式调度器
from dask.distributed import Clientclient = Client # Set up a local cluster# 你可以导航到http://localhost:8787/status 查看# 诊断仪表板,如果你有Bokeh安装的话
现在,你可以使用client.submit方法,将函数和参数作为其参数,从而将任务提交到此集群。然后我们可以使用client.gather或.result方法来收集结果。
sent = client.submit(sq, 4) # sq: square 函数result = client.gather(sent) # 或者 sent.result
你也可以仅使用dask.distributed.progress来查看当前单元格中任务的进度。你还可以明确选择使用dask.distributed.wait来等待任务完成。
Note: (Local Cluster)有时您会注意到Dask正在超出内存使用,即使它正在划分任务。它可能发生在您身上,因为您试图在数据集上使用的函数需要您的大部分数据进行处理,而多重处理可能使情况变得更糟,因为所有工作人员都可能试图将数据集复制到内存中。这可能发生在聚合的情况下。或者您可能想限制Dask只使用特定数量的内存。
在这些情况下,您可以使用Dask.distributed。LocalCluster参数,并将它们传递给Client,从而使用本地机器的核心构建LocalCluster。
from dask.distributed import Client, LocalClusterclient = Client(n_workers=1, threads_per_worker=1, processes=False, memory_limit='25GB', scheduler_port=0, silence_logs=True, diagnostics_port=0)client
'scheduler_port=0'和' stics_port=0'将为这个特定的客户端选择随机端口号。在'process =False'的情况下,dask的客户端不会复制数据集,这可能发生在您所创建的每个进程中。您可以根据自己的需要或限制对客户机进行调优,要了解更多信息,可以查看LocalCluster的参数。您还可以在同一台机器的不同端口上使用多个客户机。
Dask也有一个库,可以帮助并允许大多数流行的机器学习库,例如sklearn,tensorflow和xgboost。
在机器学习中,你可能会遇到几个不同的问题。而具体的策略取决于你面临的问题:
1. 大型模型:数据适合放入RAM,但是训练时间太长。许多超参数组合,许多模型的大型集合等。
1. 大型数据集:数据大于RAM,并且不能选择抽样。
因此,你应该:
· 对于内存中适合的问题,只需使用scikit-learn(或你最喜欢的ML库)即可;
· 对于大型模型,请使用dask_ml.joblib和你最喜欢的scikit-learn估算器
· 对于大型数据集,请使用dask_ml估算器。
a)预处理:
dask_ml.preprocessing包含一些sklearn的一些功能,如RobustScalar(稳健标量),StandardScalar(标准标量),LabelEncoder(标签编码器),OneHotEncoder(独热编码),PolynomialFeatures(多项式特性)等等,以及它的一些自己的如Categorizer(分类器),DummyEncoder(虚拟编码),OrdinalEncoder(序数编码器)等。
你可以像使用PandasDataFrame一样使用它们。
from dask_ml.preprocessing import RobustScalardf = da.read_csv("BigFile.csv", chunks=50000)rsc = RobustScalardf["column"] = rsc.fit_transform(df["column"])
你可以使用Dask的DataFrame上的预处理方法,从Sklearn的Make_pipeline方法生成一个管道。
b)超参数搜索:
Dask具有sklearn用于进行超参数搜索的方法,例如GridSearchCV,RandomizedSearchCV等等。
from dask_ml.datasets import make_regressionfrom dask_ml.model_selection import train_test_split, GridSearchCVX, y = make_regression(chunks=50000)xtr, ytr, xval, yval = test_train_split(X, y)gsearch = GridSearchCV(estimator, param_grid, cv=10)gsearch.fit(xtr, ytr)
而且,如果要partial_fit与估算器一起使用,则可以使用dask-ml的IncrementalSearchCV。
注意:(来自Dask)如果要使用后拟合任务(如评分和预测),则使用基础估计量评分方法。如果你的估算器(可能来自sklearn )无法处理大型数据集,则将估算器包装在" dask_ml.wrappers.ParallelPostFit" 周围。它可以并行化" predict"," predict_proba"," transform"等方法。
c)模型/估计器:
Dask具有一些线性模型(的LinearRegression,LogisticRegression等),一些聚类模型(Kmeans和SpectralClustering),一种使用Tensorflow 的方法,使用Dask训练XGBoost模型的方法。
如果训练数据较小,则可以将sklearn的模型与结合使用Dask,如果ParallelPostFit包装数据较大,则可以与包装器一起使用(如果测试数据较大)。
from sklearn.linear_model import ElasticNetfrom dask_ml.wrappers import ParallelPostFitel = ParallelPostFit(estimator=ElasticNet)el.fit(Xtrain, ytrain)preds = el.predict(Xtest)
如果数据集不大但模型很大,则可以使用joblib。sklearns编写了许多用于并行执行的算法(你可能使用过n_jobs=-1参数),joblib该算法利用线程和进程来并行化工作负载。要用于Dask并行化,你可以创建一个Client(客户端)(必须),然后使用with joblib.parallel_backend('dask'):包装代码。
import dask_ml.joblibfrom sklearn.externals import joblibclient = Clientwith joblib.parallel_backend('dask'): # 你的 scikit-learn 代码
注意:DASK JOBLIB后端对于扩展CPU绑定的工作负载非常有用; 在RAM中包含数据集的工作负载,但具有许多可以并行完成的单独操作。要扩展到受RAM约束的工作负载(大于内存的数据集),你应该使用Dask的内置模型和方法。
而且,如果你训练的数据太大而无法容纳到内存中,那么你应该使用Dask的内置估算器来加快速度。你也可以使用Dask的wrapper.Incremental它使用基础估算器的partial_fit方法对整个数据集进行训练,但实际上是连续的。
Dask的内置估计器很好地扩展用于大型数据集与多种优化算法,如admm,lbfgs,gradient_descent等,并且正则化器如 L1,L2,ElasticNet等。
from dask_ml.linear_model import LogisticRegressionlr = LogisticRegressionlr.fit(X, y, solver="lbfgs")
经过4期的内容讲解,你学会加快Python算法的四种方法了么?
数据分析咨询请扫描二维码
在准备数据分析师面试时,掌握高频考题及其解答是应对面试的关键。为了帮助大家轻松上岸,以下是10个高频考题及其详细解析,外加 ...
2024-12-20互联网数据分析师是一个热门且综合性的职业,他们通过数据挖掘和分析,为企业的业务决策和运营优化提供强有力的支持。尤其在如今 ...
2024-12-20在现代商业环境中,数据分析师是不可或缺的角色。他们的工作不仅仅是对数据进行深入分析,更是协助企业从复杂的数据信息中提炼出 ...
2024-12-20随着大数据时代的到来,数据驱动的决策方式开始受到越来越多企业的青睐。近年来,数据分析在人力资源管理中正在扮演着至关重要的 ...
2024-12-20在数据分析的世界里,表面上的技术操作只是“入门票”,而真正的高手则需要打破一些“看不见的墙”。这些“隐形天花板”限制了数 ...
2024-12-19在数据分析领域,尽管行业前景广阔、岗位需求旺盛,但实际的工作难度却远超很多人的想象。很多新手初入数据分析岗位时,常常被各 ...
2024-12-19入门数据分析,许多人都会感到“难”,但这“难”究竟难在哪儿?对于新手而言,往往不是技术不行,而是思维方式、业务理解和实践 ...
2024-12-19在如今的行业动荡背景下,数据分析师的职业前景虽然面临一些挑战,但也充满了许多新的机会。随着技术的不断发展和多领域需求的提 ...
2024-12-19在信息爆炸的时代,数据分析师如同探险家,在浩瀚的数据海洋中寻觅有价值的宝藏。这不仅需要技术上的过硬实力,还需要一种艺术家 ...
2024-12-19在当今信息化社会,大数据已成为各行各业不可或缺的宝贵资源。大数据专业应运而生,旨在培养具备扎实理论基础和实践能力,能够应 ...
2024-12-19阿里P8、P9失业都找不到工作?是我们孤陋寡闻还是世界真的已经“癫”成这样了? 案例一:本硕都是 985,所学的专业也是当红专业 ...
2024-12-19CDA持证人Louis CDA持证人基本情况 我大学是在一个二线城市的一所普通二本院校读的,专业是旅游管理,非计算机非统计学。毕业之 ...
2024-12-18最近,知乎上有个很火的话题:“一个人为何会陷入社会底层”? 有人说,这个世界上只有一个分水岭,就是“羊水”;还有人说,一 ...
2024-12-18在这个数据驱动的时代,数据分析师的技能需求快速增长。掌握适当的编程语言不仅能增强分析能力,还能帮助分析师从海量数据中提取 ...
2024-12-17在当今信息爆炸的时代,数据分析已经成为许多行业中不可或缺的一部分。想要在这个领域脱颖而出,除了热情和毅力外,你还需要掌握 ...
2024-12-17数据分析,是一项通过科学方法处理数据以获取洞察并支持决策的艺术。无论是在商业环境中提升业绩,还是在科研领域推动创新,数据 ...
2024-12-17在数据分析领域,图表是我们表达数据故事的重要工具。它们不仅让数据变得更加直观,也帮助我们更好地理解数据中的趋势和模式。相 ...
2024-12-16在当今社会,我们身处着一个飞速发展、变化迅猛的时代。不同行业在科技进步、市场需求和政策支持的推动下蓬勃发展,呈现出令人瞩 ...
2024-12-16在现代商业世界中,数据分析师扮演着至关重要的角色。他们通过解析海量数据,为企业战略决策提供有力支持。要有效完成这项任务, ...
2024-12-16在当今数据爆炸的时代,数据分析师是组织中不可或缺的导航者。他们通过从大量数据中提取可操作的洞察力,帮助企业在竞争激烈的市 ...
2024-12-16