京公网安备 11010802034615号
经营许可证编号:京B2-20210330
详解Python实现多进程异步事件驱动引擎
本篇文章主要介绍了详解Python实现多进程异步事件驱动引擎,小编觉得挺不错的,现在分享给大家,也给大家做个参考。
多进程异步事件驱动逻辑
逻辑
code
# -*- coding: utf-8 -*-
'''
author: Jimmy
contact: 234390130@qq.com
file: eventEngine.py
time: 2017/8/25 上午10:06
description: 多进程异步事件驱动引擎
'''
__author__ = 'Jimmy'
from multiprocessing import Process, Queue
class EventEngine(object):
# 初始化事件事件驱动引擎
def __init__(self):
#保存事件列表
self.__eventQueue = Queue()
#引擎开关
self.__active = False
#事件处理字典{'event1': [handler1,handler2] , 'event2':[handler3, ...,handler4]}
self.__handlers = {}
#保存事件处理进程池
self.__processPool = []
#事件引擎主进程
self.__mainProcess = Process(target=self.__run)
#执行事件循环
def __run(self):
while self.__active:
#事件队列非空
if not self.__eventQueue.empty():
#获取队列中的事件 超时1秒
event = self.__eventQueue.get(block=True ,timeout=1)
#执行事件
self.__process(event)
else:
# print('无任何事件')
pass
#执行事件
def __process(self, event):
if event.type in self.__handlers:
for handler in self.__handlers[event.type]:
#开一个进程去异步处理
p = Process(target=handler, args=(event, ))
#保存到进程池
self.__processPool.append(p)
p.start()
#开启事件引擎
def start(self):
self.__active = True
self.__mainProcess.start()
#暂停事件引擎
def stop(self):
"""停止"""
# 将事件管理器设为停止
self.__active = False
# 等待事件处理进程退出
for p in self.__processPool:
p.join()
self.__mainProcess.join()
#终止事件引擎
def terminate(self):
self.__active = False
#终止所有事件处理进程
for p in self.__processPool:
p.terminate()
self.__mainProcess.join()
#注册事件
def register(self, type, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建
try:
handlerList = self.__handlers[type]
except KeyError:
handlerList = []
self.__handlers[type] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
def unregister(self, type, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
try:
handlerList = self.__handlers[type]
# 如果该函数存在于列表中,则移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.__handlers[type]
except KeyError:
pass
def sendEvent(self, event):
#发送事件 像队列里存入事件
self.__eventQueue.put(event)
class Event(object):
#事件对象
def __init__(self, type =None):
self.type = type
self.dict = {}
#测试
if __name__ == '__main__':
import time
EVENT_ARTICAL = "Event_Artical"
# 事件源 公众号
class PublicAccounts:
def __init__(self, eventManager):
self.__eventManager = eventManager
def writeNewArtical(self):
# 事件对象,写了新文章
event = Event(EVENT_ARTICAL)
event.dict["artical"] = u'如何写出更优雅的代码\n'
# 发送事件
self.__eventManager.sendEvent(event)
print(u'公众号发送新文章\n')
# 监听器 订阅者
class ListenerTypeOne:
def __init__(self, username):
self.__username = username
# 监听器的处理函数 读文章
def ReadArtical(self, event):
print(u'%s 收到新文章' % self.__username)
print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
class ListenerTypeTwo:
def __init__(self, username):
self.__username = username
# 监听器的处理函数 读文章
def ReadArtical(self, event):
print(u'%s 收到新文章 睡3秒再看' % self.__username)
time.sleep(3)
print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
def test():
listner1 = ListenerTypeOne("thinkroom") # 订阅者1
listner2 = ListenerTypeTwo("steve") # 订阅者2
ee = EventEngine()
# 绑定事件和监听器响应函数(新文章)
ee.register(EVENT_ARTICAL, listner1.ReadArtical)
ee.register(EVENT_ARTICAL, listner2.ReadArtical)
for i in range(0, 20):
listner3 = ListenerTypeOne("Jimmy") # 订阅者X
ee.register(EVENT_ARTICAL, listner3.ReadArtical)
ee.start()
#发送事件
publicAcc = PublicAccounts(ee)
publicAcc.writeNewArtical()
test()
以上就是本文的全部内容,希望对大家的学习有所帮助.
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
在数据科学的工具箱中,析因分析(Factor Analysis, FA)、聚类分析(Clustering Analysis)与主成分分析(Principal Component ...
2025-12-18自2017年《Attention Is All You Need》一文问世以来,Transformer模型凭借自注意力机制的强大建模能力,在NLP、CV、语音等领域 ...
2025-12-18在CDA(Certified Data Analyst)数据分析师的时间序列分析工作中,常面临这样的困惑:某电商平台月度销售额增长20%,但增长是来 ...
2025-12-18在机器学习实践中,“超小数据集”(通常指样本量从几十到几百,远小于模型参数规模)是绕不开的场景——医疗领域的罕见病数据、 ...
2025-12-17数据仓库作为企业决策分析的“数据中枢”,其价值完全依赖于数据质量——若输入的是缺失、重复、不一致的“脏数据”,后续的建模 ...
2025-12-17在CDA(Certified Data Analyst)数据分析师的日常工作中,“随时间变化的数据”无处不在——零售企业的每日销售额、互联网平台 ...
2025-12-17在休闲游戏的运营体系中,次日留存率是当之无愧的“生死线”——它不仅是衡量产品核心吸引力的首个关键指标,更直接决定了后续LT ...
2025-12-16在数字化转型浪潮中,“以用户为中心”已成为企业的核心经营理念,而用户画像则是企业洞察用户、精准决策的“核心工具”。然而, ...
2025-12-16在零售行业从“流量争夺”转向“价值深耕”的演进中,塔吉特百货(Target)以两场标志性实践树立了行业标杆——2000年后的孕妇精 ...
2025-12-15在统计学领域,二项分布与卡方检验是两个高频出现的概念,二者都常用于处理离散数据,因此常被初学者混淆。但本质上,二项分布是 ...
2025-12-15在CDA(Certified Data Analyst)数据分析师的工作链路中,“标签加工”是连接原始数据与业务应用的关键环节。企业积累的用户行 ...
2025-12-15在Python开发中,HTTP请求是与外部服务交互的核心场景——调用第三方API、对接微服务、爬取数据等都离不开它。虽然requests库已 ...
2025-12-12在数据驱动决策中,“数据波动大不大”是高频问题——零售店长关心日销售额是否稳定,工厂管理者关注产品尺寸偏差是否可控,基金 ...
2025-12-12在CDA(Certified Data Analyst)数据分析师的能力矩阵中,数据查询语言(SQL)是贯穿工作全流程的“核心工具”。无论是从数据库 ...
2025-12-12很多小伙伴都在问CDA考试的问题,以下是结合 2025 年最新政策与行业动态更新的 CDA 数据分析师认证考试 Q&A,覆盖考试内容、报考 ...
2025-12-11在Excel数据可视化中,柱形图因直观展示数据差异的优势被广泛使用,而背景色设置绝非简单的“换颜色”——合理的背景色能突出核 ...
2025-12-11在科研实验、商业分析或医学研究中,我们常需要判断“两组数据的差异是真实存在,还是偶然波动”——比如“新降压药的效果是否优 ...
2025-12-11在CDA(Certified Data Analyst)数据分析师的工作体系中,数据库就像“数据仓库的核心骨架”——所有业务数据的存储、组织与提 ...
2025-12-11在神经网络模型搭建中,“最后一层是否添加激活函数”是新手常困惑的关键问题——有人照搬中间层的ReLU激活,导致回归任务输出异 ...
2025-12-05在机器学习落地过程中,“模型准确率高但不可解释”“面对数据噪声就失效”是两大核心痛点——金融风控模型若无法解释决策依据, ...
2025-12-05