分布式编程的难点在于:
1.服务器之间的通信,主节点如何了解从节点的执行进度,并在从节点之间进行负载均衡和任务调度;
2.如何让多个服务器上的进程访问同一资源的不同部分进行执行
第一部分涉及到网络编程的底层细节
第二个问题让我联想到hdfs的一些功能。
首先分布式进程还是解决的是单机单进程无法处理的大数据量大计算量的问题,希望能加通过一份代码(最多主+从两份)来并行执行一个大任务。
这就面临两个问题,首先将程序分布到多台服务器,其次将输入数据分配给多台服务器。
第一个问题相对比较简单,毕竟程序一般不会太长,即便是超级jar包的spark程序,也不过百兆。
但数据里不同,如今企业级别的数据动辄GB、TB,如果在分布式程序执行之前首先要进行大容量数据的转移,显然是不可取的。
这时候我们就需要一个中央共享数据源,所有服务器都可以对这个数据源进行并行存取(块block),这就已经非常接近hdfs的功能。
因为在hdfs中,集群中的多台服务器共享同一个hdfs,每台机器访问hdfs就像访问本地数据一样(还是稍微慢一点);
计算任务执行完之后,每台服务器还可以将自己的计算结果写回hdfs,每台服务器的结果被存储成了结果目录中的小文件。
# task_master.py import random, time, queue from multiprocessing.managers import BaseManager # 发送任务的队列: task_queue = queue.Queue() # 接收结果的队列: result_queue = queue.Queue() # 从BaseManager继承的QueueManager: class QueueManager(BaseManager): pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 绑定端口5000, 设置验证码'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 启动Queue: manager.start() # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() # 放几个任务进去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 从result队列读取结果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 关闭: manager.shutdown() print('master exit.')
# task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 处理结束: print('worker exit.')
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
标签:
python,分布式,编程
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件!
如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
白云城资源网 Copyright www.dyhadc.com
暂无“python分布式编程实现过程解析”评论...
更新日志
2024年11月13日
2024年11月13日
- 刘欢《雨中的树(新歌加精选)2CD》德国HD24K金碟[WAV+CUE]
- 郑源 《世间情歌》6N纯银SQCD[WAV+CUE][1G]
- 群星《粤潮2HQII》头版限量编号[低速原抓WAV+CUE][991M]
- 群星《2023好听新歌21》十倍音质 U盘音乐[WAV分轨][1G]
- 《热血传奇》双11感恩回馈 超值狂欢30天
- 原神5.2版本活动汇总 5.2版本活动有哪些
- 张敬轩.2010-NO.ELEVEN【环球】【WAV+CUE】
- 黄丽玲.2006-失恋无罪【艾回】【WAV+CUE】
- 阿达娃.2024-Laluna【W8VES】【FLAC分轨】
- 宝可梦大集结段位等级划分表大全 大集结段位一览
- 龙腾世纪影障守护者工坊与装备如何升级 工坊与装备升级说明
- 龙腾世纪影障守护者全成就攻略分享 龙腾世纪4全成就列表一览
- 《剑星》更新四套全新战衣!
- 卡普空老将伊津野英昭宣布入职腾讯光子 开发3A动作
- 38岁梅根·福克斯官宣怀孕:将迎来第四个孩子