接上一遍

   客户端代码如下

   

#-*- coding: UTF-8 -*-
__author__ = 'tiger'
#!/usr/bin/env python
import zmq, time, sys, os, atexit
from signal import SIGTERM
import random
import logging
#import ConfigParser
import subprocess
from logging.handlers import RotatingFileHandler
import socket
import fcntl
import struct
import signal
   
#定义日志函数
def mylog(logfile):
    rthandler = RotatingFileHandler(logfile, 'a', maxBytes=50 * 1024 * 1024, backupCount=3)
    formatter = logging.Formatter(
        '%(levelname)s %(thread)d %(threadName)s %(process)d %(funcName)s %(asctime)s %(filename)s[line:%(lineno)d] %(message)s',
        datefmt='%a, %d %b %Y %H:%M:%S')
    rthandler.setFormatter(formatter)
    log = logging.getLogger()
    log.setLevel(logging.INFO)
    log.addHandler(rthandler)
    return log
   
#获取指定接口的IP地址
def get_ip_address(ifname):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    return socket.inet_ntoa(fcntl.ioctl(
        s.fileno(),
        0x8915, # SIOCGIFADDR
        struct.pack('256s', ifname[:15])
    )[20:24])
   
#定义命令或者脚本执行超时所触发的错误
class ProcessTimeout(Exception):
    pass
   
#定义命令或者脚本执行超时所触发的错误句柄
def timeout_handler(signum, frame):
    raise ProcessTimeout
   
#执行命令的函数
def exec_shell(task, rundir=None, timeout=None):
    #定义超时
    if timeout:
        signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(timeout)
    p = subprocess.Popen(task, bufsize=0, shell=True, cwd=rundir, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    try:
        stdout, stderr = p.communicate()
        returncode = p.poll()
        signal.alarm(0)
    except ProcessTimeout:
        p.stdout.close()
        p.stderr.close()
        p.terminate()
        stderr = 'Calculation was taking too long, so I killed it dead.\n'
        returncode = 'timeout'
    del p
    #返回的信息,如果成功返回标准输出
    #错误返回标准错误
    if returncode == 0:
        return [returncode, stdout]
    else:
        return [returncode, stderr]
   
#任务类别为脚本时调用该函数
def exec_script(fileserver, script, dirpath, rundir=None, timeout=None):
    fileurl = fileserver + script['script_name']
    #去http服务器下载脚本,直接调用系统的wget命令下载
    getfile = 'wget -N -P ' + dirpath + ' ' + fileurl
    filename = dirpath + script['script_name']
    task = script['script_env'] + ' ' + filename
    getfile_res = exec_shell(getfile, rundir=rundir, timeout=timeout)
    if getfile_res[0] == 0:
        task_res = exec_shell(task, rundir=rundir, timeout=timeout)
        try:
            os.remove(filename)
        except Exception, err:
            task_res[1] = task_res[1] + str(err)
        return task_res
    else:
        return getfile_res
   
#与master建立连接完成与master的通信
def ioloop(sock_file, homedir, mip, stdout):
    #定义运行目录,默认情况下,脚或者命令在该目录执行
    dirpath = homedir + '/run/'
    #定义日常函数
    log = mylog(stdout)
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.setsockopt(zmq.LINGER, 0)
    socket.connect(sock_file)
    poller = zmq.Poller()
    poller.register(socket, zmq.POLLIN)
    #循环
    while 1:
        #发送请求信息
        try:
            socket.send_pyobj({'req_type': 'task', 'ip': mip})
        except Exception, err:
            log.warn(str(err))
            socket.close()
            context.term()
            time.sleep(random.randint(1, 5))
            context = zmq.Context()
            socket = context.socket(zmq.REQ)
            socket.setsockopt(zmq.LINGER, 0)
            socket.connect(sock_file)
            poller = zmq.Poller()
            poller.register(socket, zmq.POLLIN)
            continue
        #服务端响应超时
        if poller.poll(20 * 1000):
            rep = socket.recv_pyobj()
            #如果有响应信息,判断响应的类别。
            try:
                rep_type = rep['rep_type']
            except Exception, err:
                log.info(str(err))
                time.sleep(random.uniform(0.8, 1.2))
                continue
            #如果响应类别为newtask,则获取本次任务所需的其他信息
            if rep_type == 'newtask':
                try:
                    job_id = rep['id']
                    job_task = rep['task']
                    job_type = rep['type']
                    cmd_timeout = rep['cmdtimeout']
                    rundir = rep['rundir']
                    log.warn('start new job ' + str(rep))
                except Exception, err:
                    if job_id:
                        socket.send_pyobj(
                            {'id': job_id, 'code': '99', 'info': str(err), 'ip': mip, 'req_type': 'report'})
                        socket.recv_pyobj()
                    time.sleep(random.uniform(0.8, 1.2))
                    log.warn(str(err) + str(rep))
                    continue
                #如果任务类别是脚本,则尝试获取执行脚本所需的其他信息
                if job_type == 's':
                    try:
                        script_env = rep['env']
                        script = {'script_name': job_task, 'script_env': script_env}
                        fileserver = rep['fileserver']
                        #调用运行脚本的函数执行脚本
                        if rundir == 'None':
                            res = exec_script(fileserver, script, dirpath, rundir=dirpath, timeout=cmd_timeout)
                        else:
                            res = exec_script(fileserver, script, dirpath, rundir=rundir, timeout=cmd_timeout)
                    except Exception, err:
                        log.warn(str(err))
                        continue
                #任务类别为其他时则统一当作命令执行
                else:
                    if rundir == 'None':
                        res = exec_shell(job_task, rundir=dirpath, timeout=cmd_timeout)
                    else:
                        res = exec_shell(job_task, rundir=rundir, timeout=cmd_timeout)
                #将执行结果返回给master,标记请求类别为report,然master知道该请求是任务报告请求。
                socket.send_pyobj({'id': job_id, 'code': res[0], 'info': res[1], 'ip': mip, 'req_type': 'report'})
                socket.recv_pyobj()
                log.info(str({'id': job_id, 'code': res[0], 'info': res[1], 'ip': mip}))
                time.sleep(random.uniform(0.8, 1.2))
            else:
                time.sleep(random.uniform(0.8, 1.2))
        else:
            #响应超时时尝试重连master端
            log.warn("master server connect time out,will colse current socket,try again.")
            socket.close()
            context.term()
            time.sleep(random.randint(1, 5))
            context = zmq.Context()
            socket = context.socket(zmq.REQ)
            socket.setsockopt(zmq.LINGER, 0)
            socket.connect(sock_file)
            poller = zmq.Poller()
            poller.register(socket, zmq.POLLIN)
    socket.close()
    context.term()
   
   
def ip_check(ip):
    q = ip.split('.')
    return len(q) == 4 and len(filter(lambda x: x >= 0 and x <= 255, \
                                      map(int, filter(lambda x: x.isdigit(), q)))) == 4
   
   
class Daemon:
    def __init__(self, pidfile, sock_file, homedir, mip, stdin='/dev/null', stdout='/dev/null',
                 stderr='/dev/null'):
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr
        self.pidfile = pidfile
        self.homedir = homedir
        self.sock_file = sock_file
        self.mip = mip
   
    def _daemonize(self):
   
        #脱离父进程
        try:
            pid = os.fork()
            if pid > 0:
                sys.exit(0)
        except OSError, e:
            sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)
            #脱离终端
        os.setsid()
        #修改当前工作目录
        os.chdir("/")
        #重设文件创建权限
        os.umask(0)
        #第二次fork,禁止进程重新打开控制终端
        try:
            pid = os.fork()
            if pid > 0:
                sys.exit(0)
        except OSError, e:
            sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)
        sys.stdout.flush()
        sys.stderr.flush()
        si = file(self.stdin, 'r')
        so = file(self.stdout, 'a+')
        se = file(self.stderr, 'a+', 0)
        #重定向标准输入/输出/错误
        os.dup2(si.fileno(), sys.stdin.fileno())
        os.dup2(so.fileno(), sys.stdout.fileno())
        os.dup2(se.fileno(), sys.stderr.fileno())
        #注册程序退出时的函数,即删掉pid文件
        atexit.register(self.delpid)
        pid = str(os.getpid())
        file(self.pidfile, 'w+').write("%s\n" % pid)
   
    def delpid(self):
        os.remove(self.pidfile)
   
    def start(self):
        # Check for a pidfile to see if the daemon already runs
        try:
            pf = file(self.pidfile, 'r')
            pid = int(pf.read().strip())
            pf.close()
        except IOError:
            pid = None
        if pid:
            message = "pidfile %s already exist. Daemon already running?\n"
            sys.stderr.write(message % self.pidfile)
            sys.exit(1)
            # Start the daemon
        self._daemonize()
        self._run()
   
    def stop(self):
        # Get the pid from the pidfile
        try:
            pf = file(self.pidfile, 'r')
            pid = int(pf.read().strip())
            pf.close()
        except IOError:
            pid = None
        if not pid:
            message = "pidfile %s does not exist. Daemon not running?\n"
            sys.stderr.write(message % self.pidfile)
            return # not an error in a restart
            # Try killing the daemon process
        try:
            while 1:
                os.kill(pid, SIGTERM)
                time.sleep(0.1)
        except OSError, err:
            err = str(err)
            if err.find("No such process") > 0:
                if os.path.exists(self.pidfile):
                    os.remove(self.pidfile)
            else:
                print str(err)
                sys.exit(1)
   
    def restart(self):
        self.stop()
        self.start()
   
    def _run(self):
        pass
   
#重写Daemon的_run函数实现自己的Daemon进程
class MyDaemon(Daemon):
    def _run(self, ):
        ioloop(self.sock_file, self.homedir, self.mip, self.stdout)
   
#定义主函数,创建相关运行目录和定义日志路径等
def main():
    homedir = os.getcwd()
    for i in ('log', 'run'):
        path = homedir + '/' + i
        if not os.path.exists(path):
            os.makedirs(path, 0755)
    stdout = homedir + '/log' + '/oaos_client.log'
    stderr = homedir + '/log' + '/oaos_client.err'
    pidfile = homedir + '/run' + '/oaos_client.pid'
    #master的tcp接口
    sock_file = "tcp://192.168.4.194:7777"
    #该接口是指用来与master通信的客户端IP接口
    ifname = 'eth0'
    try:
        mip = get_ip_address(ifname)
    except Exception, err:
        print err
        sys.exit(3)
    daemon = MyDaemon(pidfile, sock_file, homedir, mip, stdout=stdout, stderr=stderr)
    if len(sys.argv) == 2:
        if 'start' == sys.argv[1]:
            daemon.start()
        elif 'stop' == sys.argv[1]:
            daemon.stop()
        elif 'restart' == sys.argv[1]:
            daemon.restart()
        else:
            print "Unknown command"
            sys.exit(2)
        sys.exit(0)
    else:
        print "usage: %s start|stop|restart" % sys.argv[0]
        sys.exit(2)
   
   
if __name__ == "__main__":
    main()

增加任务的接口:

#-*- coding: UTF-8 -*-
__author__ = 'tiger'
#!/usr/bin/env python
import zmq, time, os
#import random
import subprocess
import logging, ConfigParser
from logging.handlers import RotatingFileHandler
   
#定义日志函数
def mylog(logfile):
    rthandler = RotatingFileHandler(logfile, 'a', maxBytes=50 * 1024 * 1024, backupCount=3)
    formatter = logging.Formatter(
        '%(levelname)s %(thread)d %(threadName)s %(process)d %(funcName)s %(asctime)s %(filename)s[line:%(lineno)d] %(message)s',
        datefmt='%a, %d %b %Y %H:%M:%S')
    rthandler.setFormatter(formatter)
    log = logging.getLogger()
    log.setLevel(logging.INFO)
    log.addHandler(rthandler)
    return log
   
#定义IP检查判断IP是否正确
def ip_check(ip):
    q = ip.split('.')
    return len(q) == 4 and len(filter(lambda x: x >= 0 and x <= 255, \
                                      map(int, filter(lambda x: x.isdigit(), q)))) == 4
   
#配置读取函数
def read_task(config_file):
    cfg = config_file
    config = ConfigParser.SafeConfigParser()
    config.read(cfg)
    #sections = config.sections()
    #生产任务ID
    job_id = int(time.time()) + 1
    #定义IP字典,记录批量执行的IP地址
    ip_dic = {}
    #尝试获取相关配置变量
    try:
        master_sock = config.get("master", 'sock')
        ip_list = config.get("ip_list", 'ip').split(',')
        job_task = config.get("job_info", 'task')
    except Exception, err:
        print err
        return 0
    for i in ip_list:
        if ip_check(i):
            ip_dic[i] = 'N'
        else:
            print "ip error :%s" % i
            return 0
    #尝试获取任务的非必须变量,不存在设置相关默认值
    try:
        job_type = config.get("job_info", 'type')
    except:
        job_type = 'c'
   
    try:
        job_rundir = config.get("job_info", 'rundir')
    except:
        job_rundir = 'None'
   
    try:
        job_cmdtimeout = int(config.get("job_info", 'cmdtimeout'))
   
    except:
        job_cmdtimeout = 10
    try:
        job_jobtimeout = int(config.get("job_info", 'jobtimeout'))
    except:
        job_jobtimeout = 20
   
    if job_type == 's':
        try:
            job_env = config.get("job_info", 'env')
        except:
            job_env = 'sh'
        try:
            job_fileserver = config.get("job_info", 'fileserver')
        except:
            job_fileserver = 'http://192.168.0.227/ser/'
        job_info = {'id': job_id, 'type': job_type, 'task': job_task, 'jobtimeout': job_jobtimeout,
                    'cmdtimeout': job_cmdtimeout,'env': job_env, 'fileserver': job_fileserver, 'rep_type': 'newtask',
                    'rundir': job_rundir}
    else:
        job_info = {'id': job_id, 'type': job_type, 'task': job_task, 'jobtimeout': job_jobtimeout,
                    'cmdtimeout': job_cmdtimeout,'rep_type': 'newtask', 'rundir': job_rundir}
    return [ip_dic, job_info, master_sock]
#变量说明,id代表任务的id号是个自增数,根据当前时间生成,job_type代表任务的类别,c代表命令s代表脚本,job_task代表具体的命令,如果是任务
#类别是命令则执行执行,如果是脚本,客户端将去根据脚本名去http服务器下载相关脚本,jobtimeout代表整个任务的超时时间,如果客户端没有在该事件内
#报告任务状态则超时,cmdtimeout代表客户端执行具体任务的超时时间,例如top命令如果不带参数将永远执行,cmdtimeout就是为了避免类似情况
#job_env 代表任务的执行环境,通常只适用于执行脚本,job_fileserver 当任务类别是脚本时,下载脚本的http服务器地址和路径,rep_type响应给客户端的状态
#如果响应的信息为newtask,客户端将知道是有新任务了,会尝试获取任务所需的其他信息。rundir任务运行的具体路径
   
def add_job(config_file, logfile):
    #定义日常函数
    log = mylog(logfile)
    #读取配置文件
    cfg_info = read_task(config_file)
    #返回为非0时,表示配置文件读取成功并且符合预期。
    if cfg_info != 0:
        ip_dic = cfg_info[0]
        job_info = cfg_info[1]
        job_id = job_info['id']
        #task = [ip_dic, job_info]
        #生产请求信息,并且发送至master,请求添加新任务。
        task = {'req_type': 'addjob', 'job_info': job_info, 'iplist': ip_dic}
        #尝试建立与master的连接并且发生相关信息等待响应
        sock_file = cfg_info[2]
        context = zmq.Context()
        socket = context.socket(zmq.REQ)
        socket.setsockopt(zmq.LINGER, 0)
        try:
            socket.connect(sock_file)
            socket.send_pyobj(task)
        except Exception, err:
            log.info("connect to master error " +str(err))
            print "connect to master error %s " %str(err)
        #等待请求响应
        report = socket.recv_pyobj()
        #打印响应信息
        print report
        #记录到相关的日志,关闭与master的连接
        log_file = report.split()[-1]
        log.info(report)
        socket.close()
        context.term()
        #尝试读取任务报告,直接使用系统自己带的tail命令
        read_log = "tail -f " + log_file
        p = subprocess.Popen(read_log, shell=True, stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        returncode = p.poll()
        end_fag = 'job ' + str(job_id) + ' end'
        print '----------------------job report start--------------------------'
        while returncode is None:
            line = p.stdout.readline()
            returncode = p.poll()
            line = line.strip()
            print line
            if line == end_fag:
                break
        print '----------------------job report end---------------------------'
    else:
        return
   
#定义主函数设置相关运行目录和配置文件名
def main():
    config_file = 'task.ini'
    logfile = 'log/oaos_add_job.log'
    homedir = os.getcwd()
    for i in ('log', 'run'):
        path = homedir + '/' + i
        if not os.path.exists(path):
            os.makedirs(path, 0755)
    add_job(config_file, logfile)
   
   
if __name__ == "__main__":
    main()

任务配置文件示例如下

[master]
sock=tcp://192.168.4.194:7777
[ip_list]
ip=192.168.4.195,192.168.4.196,192.168.4.197,192.168.4.198
[job_info]
type=c
task=ls -rlt
cmdtimeout=10
jobtimeout=10
env=sh
fileserver=http://192.168.0.227/ser/
rundir=/root/


实例演示见 简易linux集中管理工具三


One Response


    还没有评论!
1  

Leave your comment

请留下您的姓名(*)

请输入正确的邮箱地址(*)

请输入你的评论(*)


感谢开源 © 2016. All rights reserved.&3Q Open Source&^_^赣ICP备15012863号-1^_^
乐于分享共同进步 KreativeThemes