多平台统一管理软件接口,如何实现多平台统一管理软件接口
487
2022-08-31
python包之drmaa:集群任务管理
目录
1. drmaa简介2. 安装和配置3. 示例
3.1 开始和终止会话3.2 运行工作3.3 等待工作3.4 控制工作3.5 查询工作状态
4. 应用
4.1 写一个简单应用4.2 应用示例2
搭建流程时,我们把各个模块脚本都写好了,现在通过编写主程序将模块串起来,那么怎么样依次(或者并行)将任务自动投递到集群呢?就是说这一步运行完之后,下一步自动运行。我们当然可以在脚本中设一个标志,反复检查这一个标志是否出现来决定是否运行下一步,但这种方法太原始,太多弊端了,耗内存,无法并行,且不可预料的出错。那么,有没有相应的工具来管理集群任务投递?有,python的drmaa包可以实现。
1. drmaa简介
Distributed Resource Management Application API (DRMAA),即分布式资源管理应用程序API,是一种高级 开放网格论坛(Open_Grid_Forum)应用程序接口规范,用于向分布式资源管理(DRM)系统(例如集群或网格计算提交和控制作业)。API的范围涵盖了应用程序提交,控制和监视DRM系统中执行资源上的作业所需的所有高级功能。DRMAA API已在Sun的Grid Engine(SGE)和Condor等作业管理调度系统中实现。关于SGE可参考我的推文:集群SGE作业调度系统
C、C++、Perl、Python等程序语言都开发有相应的drmaa包来实现SGE集群的任务管理。这里记录下drmaa-python: Github:drmaa-python PyPi:安装和配置
要求:Python2.7+;与DRMAA兼容的集群,如SGE。
#安装pip install drmaa#设置路径export SGE_ROOT=/path/to/gridengine #SGE安装的路径export SGE_CELL=default#设置库export DRMAA_LIBRARY_PATH=/usr/lib/libdrmaa.so.1.0 #libdrmaa.so.1.0 C动态库,是libdrmaa-dev包的一部分
3. 示例
3.1 开始和终止会话
Session
#!/usr/bin/env pythonimport drmaadef main(): """Create a drmaa session and exit""" with drmaa.Session() as s: #自动初始化,组织工作提交 print('A session was started successfully')#with结束自动exit(),大部分函数都要在exit()前执行,如runJob/wait,getContact可在exit()后。if __name__=='__main__': main()
使用可重新连接的会话,可以将DRMAA库初始化为上一个会话,从而允许该库访问该会话的作业列表.
#!/usr/bin/env pythonimport drmaadef main(): """ Create a session, show that each session has an ID, use session ID to disconnect, then reconnect. Finally, exit. """ s = drmaa.Session() s.initialize() print('A session was started successfully') response = s.contact print('session contact returns: %s' % response) s.exit() print('Exited from session') s.initialize(response) #初始化上个session print('Session was restarted successfullly') s.exit()if __name__=='__main__': main()
3.2 运行工作
假设已知当前目录有一个sleeper.sh脚本,后接两个参数:
#!/bin/bashecho "Hello world, the answer is $1"sleep 3secho "$2 Bye world!"
drmaa将sleeper.sh提交到SGE:
#!/usr/bin/env pythonimport drmaaimport osdef main(): """ Submit a job. Note, need file called sleeper.sh in current directory. """ with drmaa.Session() as s: print('Creating job template') jt = s.createJobTemplate() #分配工作模板(存储提交作业的信息结构) jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh') #设置remoteCommand属性,找到要运行的程序。 #路径默认为用户的主目录,相对路径用workingDirectory属性 jt.args = ['42', 'Simon says:'] #执行文件的参数 jt.joinFiles=True jobid = s.runJob(jt) #将分配给作业的ID放入我们传递给的字符数组中runJob() print('Your job has been submitted with ID %s' % jobid) # jobid = s.runBulkJobs(jt, 1, 30, 2) #提交一个数组作业 # print('Your jobs have been submitted with IDs %s' % jobid) print('Cleaning up') s.deleteJobTemplate(jt) #删除作业模板,释放作业模板保留的DRMAA内存,但对提交的作业没有影响if __name__=='__main__': main()
3.3 等待工作
即等待任务完成
#!/usr/bin/env pythonimport drmaaimport osdef main(): """ Submit a job and wait for it to finish. Note, need file called sleeper.sh in home directory. """ with drmaa.Session() as s: print('Creating job template') jt = s.createJobTemplate() jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh') jt.args = ['42', 'Simon says:'] jt.joinFiles = True jobid = s.runJob(jt) print('Your job has been submitted with ID %s' % jobid) retval = s.wait(jobid, drmaa.Session.TIMEOUT_WAIT_FOREVER) #调用wait()等待作业结束 print('Job: {0} finished with status {1}'.format(retval.jobId, retval.hasExited))#以下是提交多个作业的等待处理,synchronize替代wait#joblist = s.runBulkJobs(jt, 1, 30, 2)#print('Your jobs have been submitted with IDs %s' % joblist)#s.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, True) print('Cleaning up') s.deleteJobTemplate(jt)if __name__=='__main__': main()
wait()返回一个JobInfo元组,其具有下面的属性: jobId,hasExited,hasSignal,terminatedSignal,hasCoreDump, wasAborted,exitStatus,resourceUsage
synchronize()的第3个参数是该synchronize()的调用是否在工作后清除。工作完成后,它会留下一些统计信息,如退出状态和用途,直到wait() 或synchronize()的处理状态变为True。确保每一项任务对这两个函数之一调用是很有必要的,否则可能引起内存泄漏。如果想要每一项任务恢复统计信息,可将synchronize()设置False。如下:
joblist = s.runBulkJobs(jt, 1, 30, 2)print('Your jobs have been submitted with IDs %s' % joblist)s.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, False) #False,每一项工作等待一次for curjob in joblist: print('Collecting job ' + curjob) retval = s.wait(curjob, drmaa.Session.TIMEOUT_WAIT_FOREVER) print('Job: {0} finished with status {1}'.format(retval.jobId,retval.hasExited))
3.4 控制工作
#!/usr/bin/env pythonimport drmaaimport osdef main(): """Submit a job, then kill it. Note, need file called sleeper.sh in home directory. """ with drmaa.Session() as s: print('Creating job template') jt = s.createJobTemplate() jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh') jt.args = ['42', 'Simon says:'] jt.joinFiles = True jobid = s.runJob(jt) print('Your job has been submitted with ID %s' % jobid) # options are: SUSPEND, RESUME, HOLD, RELEASE, TERMINATE s.control(jobid, drmaa.JobControlAction.TERMINATE) #删除刚提交的作业 print('Cleaning up') s.deleteJobTemplate(jt)if __name__=='__main__': main()
还可以用control()来暂停,恢复,保留或释放工作。control()还可用于控制未通过DRMAA提交的作业,可以将任何有效的SGE作业ID传递control()为要删除的作业ID。
3.5 查询工作状态
#!/usr/bin/env pythonimport drmaaimport timeimport osdef main(): """ Submit a job, and check its progress. Note, need file called sleeper.sh in home directory. """ with drmaa.Session() as s: print('Creating job template') jt = s.createJobTemplate() jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh') jt.args = ['42', 'Simon says:'] jt.joinFiles=True jobid = s.runJob(jt) print('Your job has been submitted with ID %s' % jobid) # Who needs a case statement when you have dictionaries? decodestatus = {drmaa.JobState.UNDETERMINED: 'process status cannot be determined', drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active', drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold', drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold', drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', drmaa.JobState.RUNNING: 'job is running', drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended', drmaa.JobState.USER_SUSPENDED: 'job is user suspended', drmaa.JobState.DONE: 'job finished normally', drmaa.JobState.FAILED: 'job finished, but failed'} for ix in range(10): print('Checking %s of 10 times' % ix) print decodestatus(s.jobStatus(jobid)) #jobStatus()获取作业的状态 time.sleep(5) print('Cleaning up') s.deleteJobTemplate(jt)if __name__=='__main__': main() #确定工作状态并报告
其他更多关于JobInfo,JobTemplate,Session等方法的属性可参考:应用
4.1 写一个简单应用
#!/usr/bin/env pythonimport drmaaimport osclass SGE(): def __init__(self): self.__sgeProject="Test" self.__sgeQueue="test.q" self.__maxvmen="1G" self.__proc="1" self.__script="" self.__workdir="" self.__session="" def setSgeProject(self, p): self.__sgeProject=p def getSgeProject(self): return self.__sgeProject def setSgeQueue(self, q): self.__sgeQueue=q def getSgeQueue(self): return self.__sgeQueue def setMaxvmem(self, m): self.__maxvmem=m def setNumproc(self, proc): self.__proc=proc def getMaxvmem(self): return self.__maxvmem def setScript(self, s): self.__script=s def getScript(self): return self.__script def setWorkDir(self, w): self.__workdir=w def getWorkDir(self): return self.__workdir def setSession(self, ss): self.__session=ss def getSession(self): return self.__session def submit(self): st=os.stat(self.__script) #系统 stat 的调用,返回stat结构 os.chmod(self.__script, st.st_mode | stat.S_IEXEC | stat.S_IXGRP) #S_IEXEC是S_IXUSR同义词,所有者具有执行权限;S_IXGRP,组具有执行权限 jt = self.__session.createJobTemplate() ##分配工作模板 jt.remoteCommand = self.__script #remoteCommand属性找到要执行的脚本 jt.workingDirectory = self.__workdir #设定当前工作目录 par4qsub="".join(["-binding linear:",self.__proc," -P ",self.__sgeProject," -q ",self.__sgeQueue," -cwd -l ","vf=",self.__maxvmem," -l p=",self.__proc]) print('qsub {0} {1}'.format(par4qsub,self.__script)) jt.nativeSpecification = par4qsub #传递给jt的指令 jobid =self.__session.runJob(jt) #将分配给作业的ID传递给的字符数组 self.__session.deleteJobTemplate(jt) return jobiddef main(): with drmaa.Session() as s: sgeObj = SGE() sgeObj.setSession(session) sgeObj.setSgeProject("SGEProject") sgeObj.setSgeQueue("SGEQueue") dict_qsub_id={} joblist=[] cwdir=os.path.join(getcwd()) sgeObj.setWorkDir(cwdir) sgeObj.setScript(os.path.join(cwdir,"test.sh")) sgeObj.setMaxvmem("Memory") sgeObj.setNumproc("1") jobid=sgeObj.submit() dict_qsub_id[jobid]=os.path.join(cwdir,"test.sh") joblist.append(jobid) s.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, False) #设为false for curjob in joblist: retval = session.wait(curjob, drmaa.Session.TIMEOUT_WAIT_FOREVER) print('Job: {0} finished with status {1}'.format(retval.jobId,retval.hasExited))if __name__=="__main__": main()
4.2 应用示例2
说明:用MEGAN做微生物物种注释时,blast nr得到的结果太多,一次性注释太久,因此将其拆分开来。Linux环境中使用MEGAN注释需要调用xvfb-run(相当于一个wrapper, 给应用程序提供虚拟的 X server),但xvfb不能并行,当我同时运行多个注释时,MEGAN生成的临时文件rma会发生冲突,因而无法同时得到注释结果。不能并行就只能串行,但我拆分了上百份文件,不可能手动一个个投递,如何一个个任务依次运行呢?可以用drmaa写个循环。
#继承上面的SGE类def check_status(retval,running_log,path,email): if(retval.exitStatus != 0): #出错的要发邮件通知 running_log.write('{0}\nError job: {1}\n exitStatus: {2}\n wasAborted: {3}\n maxvmem: {4}Gb\n Qsub_id: {5}\n\n'.format("="*40, path, retval.exitStatus, retval.wasAborted, str(float(retval.resourceUsage['maxvmem'])/1000000000), retval.jobId )) running_log.close() emailObj = Email() emailObj.setReceiver(email) emailObj.sendMail('
Error job: {0}
exitStatus: {1}
wasAborted: {2}
maxvmem: {3}
'.format(path, retval.exitStatus, retval.wasAborted, str(float(retval.resourceUsage['maxvmem'])/1000000000) )) os._exit(0) elif(retval.wasAborted == True): #手工终止的不需要发邮件通知 running_log.write('{0}\nAborted job: {1}\n exitStatus: {2}\n wasAborted: {3}\n maxvmem: {4}Gb\n Qsub_id: {5}\n\n'.format("="*40, path, retval.exitStatus, retval.wasAborted, str(float(retval.resourceUsage['maxvmem'])/1000000000) , retval.jobId)) running_log.close() os._exit(0) else: running_log.write('{0}\nFinished job: {1}\n exitStatus: {2}\n wasAborted: {3}\n maxvmem: {4}Gb\n Qsub_id: {5}\n\n'.format("="*40, path, retval.exitStatus, retval.wasAborted, str(float(retval.resourceUsage['maxvmem'])/1000000000), retval.jobId )) running_log.flush() #立即写到文件中。def main(): project_dir=os.getcwd() running_log=open("running.log", "w") qsub_id_log=open("qsub_id.log", "w") dict_qsub_id={} with drmaa.Session() as session: sgeObj = Sge() sgeObj.setSession(session) for i in range(1,101): #拆分的一百份任务 joblist=[] subdir="tax_"+str(i) cwdir=os.path.join(project_dir,subdir) sgeObj.setWorkDir(cwdir) shell="run_tax_"+str(i)+".sh" sgeObj.setScript(os.path.join(cwdir,shell)) sgeObj.setMaxvmem("1G") sgeObj.setNumproc("1") jobid=sgeObj.submit() qsub_id_log.write(jobid+"\n") qsub_id_log.flush() dict_qsub_id[jobid]=os.path.join(cwdir,shell) joblist.append(jobid) session.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, False) for curjob in joblist: retval = session.wait(curjob, drmaa.Session.TIMEOUT_WAIT_FOREVER) check_status(retval,running_log,dict_qsub_id[retval.jobId],"123456@qq.com") emailObj = Email() emailObj.setReceiver("123456@qq.com") emailObj.sendMail('Finished job: {0}
'.format(project_dir)) print ("{0} All work is done! {0}>".format("="*30)) running_log.close() qsub_id_log.close()if __name__=="__main__": main()Ref:https://drmaa-python.readthedocs.io/en/latest/tutorials.html#starting-and-stopping-a-session
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~