CDA数据分析师 出品
相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下python的计算时间,减少大家在算法上的等待时间。今天给大家讲述最后一方面的内容,关于Dask的方法运用。
随着对机器学习算法并行化的需求不断增加,由于数据大小甚至模型大小呈指数级增长,如果我们拥有一个工具,可以帮助我们并行化处理Pandas的DataFrame,可以并行化处理Numpy的计算,甚至并行化我们的机器学习算法(可能是来自sklearn和Tensorflow的算法)也没有太多的麻烦,那它对我们会非常有帮助。
好消息是确实存在这样的库,其名称为Dask。Dask是一个并行计算库,它不仅有助于并行化现有的机器学习工具(Pandas和Numpy)(即使用高级集合),而且还有助于并行化低级任务/功能,并且可以通过制作任务图来处理这些功能之间的复杂交互。[ 即使用低级调度程序 ]这类似于Python的线程或多处理模块。
他们也有一个单独的机器学习库dask-ml,这与如现有的库集成如sklearn,xgboost和tensorflow。
Dask通过绘制任务之间的交互图来并行化分配给它的任务。使用Dask的.visualize方法来可视化你的工作将非常有帮助,该方法可用于所有数据类型以及你计算的复杂任务链。此方法将输出你的任务图,并且如果你的任务在每个级别具有多个节点(即,你的任务链结构在多个层次上具有许多独立的任务,例如数据块上的并行任务),然后Dask将能够并行化它们。
注意: Dask仍是一个相对较新的项目。它还有很长的路要走。不过,如果你不想学习全新的API(例如PySpark),Dask是你的最佳选择,将来肯定会越来越好。Spark / PySpark仍然遥遥领先,并且仍将继续改进。这是一个完善的Apache项目。
Dask中的每种数据类型都提供现有数据类型的分布式版本,例如pandas中的DataFrame、numpy中的ndarray和Python中的list。这些数据类型可以大于你的内存,Dask将以Blocked方式对数据并行(y)运行计算。Blocked从某种意义上说,它们是通过执行许多小的计算(即,以块为单位)来执行大型计算的,而块的数量为chunks的总数。
a)数组:
网格中的许多Numpy数组作为Dask数组
Dask Array对非常大的数组进行操作,将它们划分为块并并行执行这些块。它有许多可用的numpy方法,你可以使用这些方法来加快速度。但是其中一些没有实现。
只要支持numpy切片,Dask Array就可以从任何类似数组结构中读取数据,并且可以通过使用并且通过使用Dask . Array .from_array方法具有.shape属性。它还可以读取.npy和.zarr文件。
import dask.array as daimport numpy as nparr = numpy.random.randint(1, 1000, (10000, 10000))darr = da.from_array(arr, chunks=(1000, 1000))# 它会生成大小为(1000,1000)的块darr.npartitioins# 100
当你的数组真的很重时(即它们无法放入内存)并且numpy对此无能为力时,可以使用它。因此,Dask将它们分为数组块并为你并行处理它们。
现在,Dask对每种方法进行惰性评估。因此,要实际计算函数的值,必须使用.compute方法。它将以块为单位并行计算结果,同时并行化每个独立任务。
result = darr.compute
1)元素数量较少时,Numpy比Dask快;2)Dask接管了Numpy,耗时约1e7个元素;3)Numpy无法产生更多元素的结果,因为它无法将它们存储在内存中。
b)DataFrame:
5个Pandas DataFrame在一个Dask DataFrame中提供每月数据(可以来自diff文件)
与Dask Arrays相似,Dask DataFrames通过将文件划分为块并将这些块的计算函数并行化,从而对不适合内存非常大的数据文件进行并行计算。
import dask.dataframe as dddf = dd.read_csv("BigFile(s).csv", blocksize=50e6)
现在,你可以应用/使用pandas库中可用的大多数功能,并在此处应用。
agg = df.groupby(["column"]).aggregate(["sum", "mean", "max", "min"])agg.columns = new_column_names #请查看notebookdf_new = df.merge(agg.reset_index, on="column", how="left")df_new.compute.head
c)Bag:
Dask Bag包并行处理包含多个数据类型元素Python的list相似对象。当你尝试处理一些半结构化数据(例如JSON Blob或日志文件)时,此功能很有用。
import dask.bag as dbb = db.from_txt("BigSemiStructuredData.txt")b.take(1)
Daskbag逐行读取,.take方法输出指定行数的元组。
Dask Bag在这样的Python对象集合上实现例如map,filter,fold,和groupby等操作。它使用Python迭代器并行地完成这个任务,占用的内存很小。它类似于PyToolz的并行版本或PySpark RDD的Python版本。
filtered = b.filter(lambda x: x["Name"]=="James")\ .map(lambda x: x["Address"] = "New_Address")filtered.compute
如果你的任务有点简单,并且你不能或不想使用这些高级集合来执行操作,则可以使用低级调度程序,该程序可帮助你使用dask.delayed接口并行化代码/算法。dask.delayed也可以进行延迟计算。
import dask.delayed as delay@delaydef sq(x): return x**2@delay def add(x, y): return x+y@delay def sum(arr): sum=0 for i in range(len(arr)): sum+=arr[i] return sum
你可以根据需要在这些函数之间添加复杂的交互,使用上一个任务的结果作为下一个任务的参数。Dask不会立即计算这些函数,而是会为你的任务绘制图形,有效地合并你使用的函数之间的交互。
inputs = list(np.arange(1, 11))#将外接程序 dask.delayed 加入到列表temp = for i in range(len(inputs)): temp.append(sq(inputs[i])) # 计算输入的sq并保存 # 延迟计算在列表inputs=temp; temp = for i in range(0, len(inputs)-1, 2): temp.append(add(inputs[i]+inputs[i+1])) # 添加两个连续 # 结果从prev步骤inputs = tempresult = sum(inputs) # 将所有prev步骤的结果相加results.compute
你可以将延迟添加到具有许多可能的小块的任何可并行化代码中,从而获得加速的效果。它可以是你想计算的许多函数,例如上面的示例,或者可以使用并行读取多个文件pandas.read_csv。
首先,到目前为止,我们一直使用Dask的默认调度器来计算任务的结果。但是你可以根据需要从Dask提供的选项中更改它们。
Dask 带有四个可用的调度程序:
· threaded:由线程池支持的调度程序
· processes:由进程池支持的调度程序
· single-threaded(又名" sync"):同步调度程序,用于调试
· distributed:用于在多台计算机上执行图形的分布式调度程序
result.compute(scheduler="single-threaded") #用于调试# 或者dask.config.set(scheduler="single-threaded")result.compute#注:(从官方网页)#当被称为GIL的函数释放时,线程任务将工作得很好,而多处理总是启动时间较慢,并且在任务之间需要大量的通信。# 你可以通过其中一个得到调度程序 commands:dask.threaded.get, dask.multiprocessing.get, dask.local.get_sync#单线程的最后一个
但是,Dask还有一个调度器,dask.distributed由于以下原因它可能是首选使用的:
1. 它提供了异步API的访问,尤其是Future,
1. 它提供了一个诊断仪表板,可以提供有关性能和进度的宝贵见解
1. 它可以更复杂地处理数据位置,因此在需要多个流程的工作负载上,它比多处理调度程序更有效。
你可以创建一个Dask的dask.distributed调度程序,通过导入和创建客户端实现分布式调度器
from dask.distributed import Clientclient = Client # Set up a local cluster# 你可以导航到http://localhost:8787/status 查看# 诊断仪表板,如果你有Bokeh安装的话
现在,你可以使用client.submit方法,将函数和参数作为其参数,从而将任务提交到此集群。然后我们可以使用client.gather或.result方法来收集结果。
sent = client.submit(sq, 4) # sq: square 函数result = client.gather(sent) # 或者 sent.result
你也可以仅使用dask.distributed.progress来查看当前单元格中任务的进度。你还可以明确选择使用dask.distributed.wait来等待任务完成。
Note: (Local Cluster)有时您会注意到Dask正在超出内存使用,即使它正在划分任务。它可能发生在您身上,因为您试图在数据集上使用的函数需要您的大部分数据进行处理,而多重处理可能使情况变得更糟,因为所有工作人员都可能试图将数据集复制到内存中。这可能发生在聚合的情况下。或者您可能想限制Dask只使用特定数量的内存。
在这些情况下,您可以使用Dask.distributed。LocalCluster参数,并将它们传递给Client,从而使用本地机器的核心构建LocalCluster。
from dask.distributed import Client, LocalClusterclient = Client(n_workers=1, threads_per_worker=1, processes=False, memory_limit='25GB', scheduler_port=0, silence_logs=True, diagnostics_port=0)client
'scheduler_port=0'和' stics_port=0'将为这个特定的客户端选择随机端口号。在'process =False'的情况下,dask的客户端不会复制数据集,这可能发生在您所创建的每个进程中。您可以根据自己的需要或限制对客户机进行调优,要了解更多信息,可以查看LocalCluster的参数。您还可以在同一台机器的不同端口上使用多个客户机。
Dask也有一个库,可以帮助并允许大多数流行的机器学习库,例如sklearn,tensorflow和xgboost。
在机器学习中,你可能会遇到几个不同的问题。而具体的策略取决于你面临的问题:
1. 大型模型:数据适合放入RAM,但是训练时间太长。许多超参数组合,许多模型的大型集合等。
1. 大型数据集:数据大于RAM,并且不能选择抽样。
因此,你应该:
· 对于内存中适合的问题,只需使用scikit-learn(或你最喜欢的ML库)即可;
· 对于大型模型,请使用dask_ml.joblib和你最喜欢的scikit-learn估算器
· 对于大型数据集,请使用dask_ml估算器。
a)预处理:
dask_ml.preprocessing包含一些sklearn的一些功能,如RobustScalar(稳健标量),StandardScalar(标准标量),LabelEncoder(标签编码器),OneHotEncoder(独热编码),PolynomialFeatures(多项式特性)等等,以及它的一些自己的如Categorizer(分类器),DummyEncoder(虚拟编码),OrdinalEncoder(序数编码器)等。
你可以像使用PandasDataFrame一样使用它们。
from dask_ml.preprocessing import RobustScalardf = da.read_csv("BigFile.csv", chunks=50000)rsc = RobustScalardf["column"] = rsc.fit_transform(df["column"])
你可以使用Dask的DataFrame上的预处理方法,从Sklearn的Make_pipeline方法生成一个管道。
b)超参数搜索:
Dask具有sklearn用于进行超参数搜索的方法,例如GridSearchCV,RandomizedSearchCV等等。
from dask_ml.datasets import make_regressionfrom dask_ml.model_selection import train_test_split, GridSearchCVX, y = make_regression(chunks=50000)xtr, ytr, xval, yval = test_train_split(X, y)gsearch = GridSearchCV(estimator, param_grid, cv=10)gsearch.fit(xtr, ytr)
而且,如果要partial_fit与估算器一起使用,则可以使用dask-ml的IncrementalSearchCV。
注意:(来自Dask)如果要使用后拟合任务(如评分和预测),则使用基础估计量评分方法。如果你的估算器(可能来自sklearn )无法处理大型数据集,则将估算器包装在" dask_ml.wrappers.ParallelPostFit" 周围。它可以并行化" predict"," predict_proba"," transform"等方法。
c)模型/估计器:
Dask具有一些线性模型(的LinearRegression,LogisticRegression等),一些聚类模型(Kmeans和SpectralClustering),一种使用Tensorflow 的方法,使用Dask训练XGBoost模型的方法。
如果训练数据较小,则可以将sklearn的模型与结合使用Dask,如果ParallelPostFit包装数据较大,则可以与包装器一起使用(如果测试数据较大)。
from sklearn.linear_model import ElasticNetfrom dask_ml.wrappers import ParallelPostFitel = ParallelPostFit(estimator=ElasticNet)el.fit(Xtrain, ytrain)preds = el.predict(Xtest)
如果数据集不大但模型很大,则可以使用joblib。sklearns编写了许多用于并行执行的算法(你可能使用过n_jobs=-1参数),joblib该算法利用线程和进程来并行化工作负载。要用于Dask并行化,你可以创建一个Client(客户端)(必须),然后使用with joblib.parallel_backend('dask'):包装代码。
import dask_ml.joblibfrom sklearn.externals import joblibclient = Clientwith joblib.parallel_backend('dask'): # 你的 scikit-learn 代码
注意:DASK JOBLIB后端对于扩展CPU绑定的工作负载非常有用; 在RAM中包含数据集的工作负载,但具有许多可以并行完成的单独操作。要扩展到受RAM约束的工作负载(大于内存的数据集),你应该使用Dask的内置模型和方法。
而且,如果你训练的数据太大而无法容纳到内存中,那么你应该使用Dask的内置估算器来加快速度。你也可以使用Dask的wrapper.Incremental它使用基础估算器的partial_fit方法对整个数据集进行训练,但实际上是连续的。
Dask的内置估计器很好地扩展用于大型数据集与多种优化算法,如admm,lbfgs,gradient_descent等,并且正则化器如 L1,L2,ElasticNet等。
from dask_ml.linear_model import LogisticRegressionlr = LogisticRegressionlr.fit(X, y, solver="lbfgs")
经过4期的内容讲解,你学会加快Python算法的四种方法了么?
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
在数据分析领域,Excel作为一种普及率极高且功能强大的工具,无疑为无数专业人士提供了便捷的解决方案。尽管Excel自带了丰富的功 ...
2025-01-17在这个瞬息万变的时代,许多人都在寻找能让他们脱颖而出的职业。而数据分析师,作为大数据和人工智能时代的热门职业,自然吸引了 ...
2025-01-14Python作为一门功能强大的编程语言,已经成为数据分析和可视化领域的重要工具。无论你是数据分析的新手,还是经验丰富的专业人士 ...
2025-01-10完全靠数据决策,真的靠谱吗? 最近几年,“数据驱动”成了商界最火的关键词之一,但靠数据就能走天下?其实不然!那些真正成功 ...
2025-01-09SparkSQL 结构化数据处理流程及原理是什么?Spark SQL 可以使用现有的Hive元存储、SerDes 和 UDF。它可以使用 JDBC/ODB ...
2025-01-09在如今这个信息爆炸的时代,数据已然成为企业的生命线。无论是科技公司还是传统行业,数据分析正在深刻地影响着商业决策以及未来 ...
2025-01-08“数据为王”相信大家都听说过。当前,数据信息不再仅仅是传递的媒介,它成为了驱动经济发展的新燃料。对于企业而言,数据指标体 ...
2025-01-07在职场中,当你遇到问题的时候,如果感到无从下手,或者抓不到重点,可能是因为你掌握的思维模型不够多。 一个好用的思维模型, ...
2025-01-06在现代企业中,数据分析师扮演着至关重要的角色。每天都有大量数据涌入,从社交媒体到交易平台,数据以空前的速度和规模生成。面 ...
2025-01-06在职场中,许多言辞并非表面意思那么简单,有时需要听懂背后的“潜台词”。尤其在数据分析的领域里,掌握常用术语就像掌握一门新 ...
2025-01-04在当今信息化社会,数据分析已成为各行各业的核心驱动力。它不仅仅是对数字进行整理与计算,而是在数据的海洋中探寻规律,从而指 ...
2025-01-03又到一年年终时,各位打工人也迎来了展示成果的关键时刻 —— 年终述职。一份出色的年终述职报告,不仅能全面呈现你的工作价值, ...
2025-01-03在竞争激烈的商业世界中,竞品分析对于企业的发展至关重要。今天,我们就来详细聊聊数据分析师写竞品分析的那些事儿。 一、明确 ...
2025-01-03在数据分析的江湖里,有两个阵营总是争论不休。一派信奉“大即是美”,认为数据越多越好;另一派坚守“小而精”,力挺质量胜于规 ...
2025-01-02数据分析是一个复杂且多维度的过程,从数据收集到分析结果应用,每一步都是对信息的提炼与升华。可视化分析结果,以图表的形式展 ...
2025-01-02在当今的数字化时代,数据分析师扮演着一个至关重要的角色。他们如同现代企业的“解密专家”,通过解析数据为企业提供决策支持。 ...
2025-01-02数据分析报告至关重要 一份高质量的数据分析报告不仅能够揭示数据背后的真相,还能为企业决策者提供有价值的洞察和建议。 年薪 ...
2024-12-31数据分析,听起来好像是技术大咖的专属技能,但其实是一项人人都能学会的职场硬核能力!今天,我们来聊聊数据分析的核心流程,拆 ...
2024-12-31提到数据分析,你脑海里可能会浮现出一群“数字控”抱着电脑,在海量数据里疯狂敲代码的画面。但事实是,数据分析并没有你想象的 ...
2024-12-31关于数据分析师是否会成为失业高危职业,近年来的讨论层出不穷。在这个快速变化的时代,技术进步让人既兴奋又不安。今天,我们从 ...
2024-12-30