接上一遍
客户端代码如下
#-*- coding: UTF-8 -*- __author__ = 'tiger' #!/usr/bin/env python import zmq, time, sys, os, atexit from signal import SIGTERM import random import logging #import ConfigParser import subprocess from logging.handlers import RotatingFileHandler import socket import fcntl import struct import signal #定义日志函数 def mylog(logfile): rthandler = RotatingFileHandler(logfile, 'a', maxBytes=50 * 1024 * 1024, backupCount=3) formatter = logging.Formatter( '%(levelname)s %(thread)d %(threadName)s %(process)d %(funcName)s %(asctime)s %(filename)s[line:%(lineno)d] %(message)s', datefmt='%a, %d %b %Y %H:%M:%S') rthandler.setFormatter(formatter) log = logging.getLogger() log.setLevel(logging.INFO) log.addHandler(rthandler) return log #获取指定接口的IP地址 def get_ip_address(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', ifname[:15]) )[20:24]) #定义命令或者脚本执行超时所触发的错误 class ProcessTimeout(Exception): pass #定义命令或者脚本执行超时所触发的错误句柄 def timeout_handler(signum, frame): raise ProcessTimeout #执行命令的函数 def exec_shell(task, rundir=None, timeout=None): #定义超时 if timeout: signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(timeout) p = subprocess.Popen(task, bufsize=0, shell=True, cwd=rundir, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: stdout, stderr = p.communicate() returncode = p.poll() signal.alarm(0) except ProcessTimeout: p.stdout.close() p.stderr.close() p.terminate() stderr = 'Calculation was taking too long, so I killed it dead.\n' returncode = 'timeout' del p #返回的信息,如果成功返回标准输出 #错误返回标准错误 if returncode == 0: return [returncode, stdout] else: return [returncode, stderr] #任务类别为脚本时调用该函数 def exec_script(fileserver, script, dirpath, rundir=None, timeout=None): fileurl = fileserver + script['script_name'] #去http服务器下载脚本,直接调用系统的wget命令下载 getfile = 'wget -N -P ' + dirpath + ' ' + fileurl filename = dirpath + script['script_name'] task = script['script_env'] + ' ' + filename getfile_res = exec_shell(getfile, rundir=rundir, timeout=timeout) if getfile_res[0] == 0: task_res = exec_shell(task, rundir=rundir, timeout=timeout) try: os.remove(filename) except Exception, err: task_res[1] = task_res[1] + str(err) return task_res else: return getfile_res #与master建立连接完成与master的通信 def ioloop(sock_file, homedir, mip, stdout): #定义运行目录,默认情况下,脚或者命令在该目录执行 dirpath = homedir + '/run/' #定义日常函数 log = mylog(stdout) context = zmq.Context() socket = context.socket(zmq.REQ) socket.setsockopt(zmq.LINGER, 0) socket.connect(sock_file) poller = zmq.Poller() poller.register(socket, zmq.POLLIN) #循环 while 1: #发送请求信息 try: socket.send_pyobj({'req_type': 'task', 'ip': mip}) except Exception, err: log.warn(str(err)) socket.close() context.term() time.sleep(random.randint(1, 5)) context = zmq.Context() socket = context.socket(zmq.REQ) socket.setsockopt(zmq.LINGER, 0) socket.connect(sock_file) poller = zmq.Poller() poller.register(socket, zmq.POLLIN) continue #服务端响应超时 if poller.poll(20 * 1000): rep = socket.recv_pyobj() #如果有响应信息,判断响应的类别。 try: rep_type = rep['rep_type'] except Exception, err: log.info(str(err)) time.sleep(random.uniform(0.8, 1.2)) continue #如果响应类别为newtask,则获取本次任务所需的其他信息 if rep_type == 'newtask': try: job_id = rep['id'] job_task = rep['task'] job_type = rep['type'] cmd_timeout = rep['cmdtimeout'] rundir = rep['rundir'] log.warn('start new job ' + str(rep)) except Exception, err: if job_id: socket.send_pyobj( {'id': job_id, 'code': '99', 'info': str(err), 'ip': mip, 'req_type': 'report'}) socket.recv_pyobj() time.sleep(random.uniform(0.8, 1.2)) log.warn(str(err) + str(rep)) continue #如果任务类别是脚本,则尝试获取执行脚本所需的其他信息 if job_type == 's': try: script_env = rep['env'] script = {'script_name': job_task, 'script_env': script_env} fileserver = rep['fileserver'] #调用运行脚本的函数执行脚本 if rundir == 'None': res = exec_script(fileserver, script, dirpath, rundir=dirpath, timeout=cmd_timeout) else: res = exec_script(fileserver, script, dirpath, rundir=rundir, timeout=cmd_timeout) except Exception, err: log.warn(str(err)) continue #任务类别为其他时则统一当作命令执行 else: if rundir == 'None': res = exec_shell(job_task, rundir=dirpath, timeout=cmd_timeout) else: res = exec_shell(job_task, rundir=rundir, timeout=cmd_timeout) #将执行结果返回给master,标记请求类别为report,然master知道该请求是任务报告请求。 socket.send_pyobj({'id': job_id, 'code': res[0], 'info': res[1], 'ip': mip, 'req_type': 'report'}) socket.recv_pyobj() log.info(str({'id': job_id, 'code': res[0], 'info': res[1], 'ip': mip})) time.sleep(random.uniform(0.8, 1.2)) else: time.sleep(random.uniform(0.8, 1.2)) else: #响应超时时尝试重连master端 log.warn("master server connect time out,will colse current socket,try again.") socket.close() context.term() time.sleep(random.randint(1, 5)) context = zmq.Context() socket = context.socket(zmq.REQ) socket.setsockopt(zmq.LINGER, 0) socket.connect(sock_file) poller = zmq.Poller() poller.register(socket, zmq.POLLIN) socket.close() context.term() def ip_check(ip): q = ip.split('.') return len(q) == 4 and len(filter(lambda x: x >= 0 and x <= 255, \ map(int, filter(lambda x: x.isdigit(), q)))) == 4 class Daemon: def __init__(self, pidfile, sock_file, homedir, mip, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): self.stdin = stdin self.stdout = stdout self.stderr = stderr self.pidfile = pidfile self.homedir = homedir self.sock_file = sock_file self.mip = mip def _daemonize(self): #脱离父进程 try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) #脱离终端 os.setsid() #修改当前工作目录 os.chdir("/") #重设文件创建权限 os.umask(0) #第二次fork,禁止进程重新打开控制终端 try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) sys.stdout.flush() sys.stderr.flush() si = file(self.stdin, 'r') so = file(self.stdout, 'a+') se = file(self.stderr, 'a+', 0) #重定向标准输入/输出/错误 os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) #注册程序退出时的函数,即删掉pid文件 atexit.register(self.delpid) pid = str(os.getpid()) file(self.pidfile, 'w+').write("%s\n" % pid) def delpid(self): os.remove(self.pidfile) def start(self): # Check for a pidfile to see if the daemon already runs try: pf = file(self.pidfile, 'r') pid = int(pf.read().strip()) pf.close() except IOError: pid = None if pid: message = "pidfile %s already exist. Daemon already running?\n" sys.stderr.write(message % self.pidfile) sys.exit(1) # Start the daemon self._daemonize() self._run() def stop(self): # Get the pid from the pidfile try: pf = file(self.pidfile, 'r') pid = int(pf.read().strip()) pf.close() except IOError: pid = None if not pid: message = "pidfile %s does not exist. Daemon not running?\n" sys.stderr.write(message % self.pidfile) return # not an error in a restart # Try killing the daemon process try: while 1: os.kill(pid, SIGTERM) time.sleep(0.1) except OSError, err: err = str(err) if err.find("No such process") > 0: if os.path.exists(self.pidfile): os.remove(self.pidfile) else: print str(err) sys.exit(1) def restart(self): self.stop() self.start() def _run(self): pass #重写Daemon的_run函数实现自己的Daemon进程 class MyDaemon(Daemon): def _run(self, ): ioloop(self.sock_file, self.homedir, self.mip, self.stdout) #定义主函数,创建相关运行目录和定义日志路径等 def main(): homedir = os.getcwd() for i in ('log', 'run'): path = homedir + '/' + i if not os.path.exists(path): os.makedirs(path, 0755) stdout = homedir + '/log' + '/oaos_client.log' stderr = homedir + '/log' + '/oaos_client.err' pidfile = homedir + '/run' + '/oaos_client.pid' #master的tcp接口 sock_file = "tcp://192.168.4.194:7777" #该接口是指用来与master通信的客户端IP接口 ifname = 'eth0' try: mip = get_ip_address(ifname) except Exception, err: print err sys.exit(3) daemon = MyDaemon(pidfile, sock_file, homedir, mip, stdout=stdout, stderr=stderr) if len(sys.argv) == 2: if 'start' == sys.argv[1]: daemon.start() elif 'stop' == sys.argv[1]: daemon.stop() elif 'restart' == sys.argv[1]: daemon.restart() else: print "Unknown command" sys.exit(2) sys.exit(0) else: print "usage: %s start|stop|restart" % sys.argv[0] sys.exit(2) if __name__ == "__main__": main()
增加任务的接口:
#-*- coding: UTF-8 -*- __author__ = 'tiger' #!/usr/bin/env python import zmq, time, os #import random import subprocess import logging, ConfigParser from logging.handlers import RotatingFileHandler #定义日志函数 def mylog(logfile): rthandler = RotatingFileHandler(logfile, 'a', maxBytes=50 * 1024 * 1024, backupCount=3) formatter = logging.Formatter( '%(levelname)s %(thread)d %(threadName)s %(process)d %(funcName)s %(asctime)s %(filename)s[line:%(lineno)d] %(message)s', datefmt='%a, %d %b %Y %H:%M:%S') rthandler.setFormatter(formatter) log = logging.getLogger() log.setLevel(logging.INFO) log.addHandler(rthandler) return log #定义IP检查判断IP是否正确 def ip_check(ip): q = ip.split('.') return len(q) == 4 and len(filter(lambda x: x >= 0 and x <= 255, \ map(int, filter(lambda x: x.isdigit(), q)))) == 4 #配置读取函数 def read_task(config_file): cfg = config_file config = ConfigParser.SafeConfigParser() config.read(cfg) #sections = config.sections() #生产任务ID job_id = int(time.time()) + 1 #定义IP字典,记录批量执行的IP地址 ip_dic = {} #尝试获取相关配置变量 try: master_sock = config.get("master", 'sock') ip_list = config.get("ip_list", 'ip').split(',') job_task = config.get("job_info", 'task') except Exception, err: print err return 0 for i in ip_list: if ip_check(i): ip_dic[i] = 'N' else: print "ip error :%s" % i return 0 #尝试获取任务的非必须变量,不存在设置相关默认值 try: job_type = config.get("job_info", 'type') except: job_type = 'c' try: job_rundir = config.get("job_info", 'rundir') except: job_rundir = 'None' try: job_cmdtimeout = int(config.get("job_info", 'cmdtimeout')) except: job_cmdtimeout = 10 try: job_jobtimeout = int(config.get("job_info", 'jobtimeout')) except: job_jobtimeout = 20 if job_type == 's': try: job_env = config.get("job_info", 'env') except: job_env = 'sh' try: job_fileserver = config.get("job_info", 'fileserver') except: job_fileserver = 'http://192.168.0.227/ser/' job_info = {'id': job_id, 'type': job_type, 'task': job_task, 'jobtimeout': job_jobtimeout, 'cmdtimeout': job_cmdtimeout,'env': job_env, 'fileserver': job_fileserver, 'rep_type': 'newtask', 'rundir': job_rundir} else: job_info = {'id': job_id, 'type': job_type, 'task': job_task, 'jobtimeout': job_jobtimeout, 'cmdtimeout': job_cmdtimeout,'rep_type': 'newtask', 'rundir': job_rundir} return [ip_dic, job_info, master_sock] #变量说明,id代表任务的id号是个自增数,根据当前时间生成,job_type代表任务的类别,c代表命令s代表脚本,job_task代表具体的命令,如果是任务 #类别是命令则执行执行,如果是脚本,客户端将去根据脚本名去http服务器下载相关脚本,jobtimeout代表整个任务的超时时间,如果客户端没有在该事件内 #报告任务状态则超时,cmdtimeout代表客户端执行具体任务的超时时间,例如top命令如果不带参数将永远执行,cmdtimeout就是为了避免类似情况 #job_env 代表任务的执行环境,通常只适用于执行脚本,job_fileserver 当任务类别是脚本时,下载脚本的http服务器地址和路径,rep_type响应给客户端的状态 #如果响应的信息为newtask,客户端将知道是有新任务了,会尝试获取任务所需的其他信息。rundir任务运行的具体路径 def add_job(config_file, logfile): #定义日常函数 log = mylog(logfile) #读取配置文件 cfg_info = read_task(config_file) #返回为非0时,表示配置文件读取成功并且符合预期。 if cfg_info != 0: ip_dic = cfg_info[0] job_info = cfg_info[1] job_id = job_info['id'] #task = [ip_dic, job_info] #生产请求信息,并且发送至master,请求添加新任务。 task = {'req_type': 'addjob', 'job_info': job_info, 'iplist': ip_dic} #尝试建立与master的连接并且发生相关信息等待响应 sock_file = cfg_info[2] context = zmq.Context() socket = context.socket(zmq.REQ) socket.setsockopt(zmq.LINGER, 0) try: socket.connect(sock_file) socket.send_pyobj(task) except Exception, err: log.info("connect to master error " +str(err)) print "connect to master error %s " %str(err) #等待请求响应 report = socket.recv_pyobj() #打印响应信息 print report #记录到相关的日志,关闭与master的连接 log_file = report.split()[-1] log.info(report) socket.close() context.term() #尝试读取任务报告,直接使用系统自己带的tail命令 read_log = "tail -f " + log_file p = subprocess.Popen(read_log, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) returncode = p.poll() end_fag = 'job ' + str(job_id) + ' end' print '----------------------job report start--------------------------' while returncode is None: line = p.stdout.readline() returncode = p.poll() line = line.strip() print line if line == end_fag: break print '----------------------job report end---------------------------' else: return #定义主函数设置相关运行目录和配置文件名 def main(): config_file = 'task.ini' logfile = 'log/oaos_add_job.log' homedir = os.getcwd() for i in ('log', 'run'): path = homedir + '/' + i if not os.path.exists(path): os.makedirs(path, 0755) add_job(config_file, logfile) if __name__ == "__main__": main()
任务配置文件示例如下
[master]
sock=tcp://192.168.4.194:7777
[ip_list]
ip=192.168.4.195,192.168.4.196,192.168.4.197,192.168.4.198
[job_info]
type=c
task=ls -rlt
cmdtimeout=10
jobtimeout=10
env=sh
fileserver=http://192.168.0.227/ser/
rundir=/root/
实例演示见 简易linux集中管理工具三
One Response