了解过不少linux集中管理工具,基本原理都差不太多,只是复杂度和安装难度不同而已,本人习惯一切从简也喜欢玩,自己写了个简单的。不追求一次性管理成千上万台服务器,简单的管理1000台以内的服务器因该问题不大。基本原则安装简单,管理简单,功能简单满足日常基本需求即可。其他的安全和别的问题,大家可以自己去折腾。

   该工具实现了2个基本的功能,一个是集中发布命令,另外一个是集中发布脚本,脚本发布需要一个http服务器提供客户端下载相关脚本。linux集中管理基本也是基于这2条,别的就根据各自喜欢自己去增加吧。

   基本思路master监听来自客户端的请求。正常的请求分为以下几个类别

1、新任务,即需要发布的批量任务

2、客户端任务请求,即属于该客户端的任务

3、客户端任务报告,来次客户端的任务反馈


如图

   

   

   

   服务端master负责任务发布,以及任务监控,客户端与服务端建立长连接,每隔1秒左右会请求master询问是否有属于自己的任务,这样做很傻,但是简单,服务器不需要去做太复杂的连接管理,目前服务器还是属于阻塞连接,后续有机会改改。

   大概原理就这样,其他的就不说太多废话了。

   使用到的软件和包python+pyzmq,系统命令有wget和tail

   系统环境需要安装以下编译所需的编译环境

   小工具的安装步骤

mkdir /oaos

tar -zxvf Python-2.7.9.tgz
cd Python-2.7.9
./configure --prefix=/oaos/python
make && make install
tar  -zxvf setuptools-12.1.tar.gz
cd setuptools-12.1
/oaos/python/bin/python setup.py build
/oaos/python/bin/python setup.py install
unzip pyzmq-2.2.0.1.zip
cd pyzmq-2.2.0.1
/oaos/python/bin/python setup.py build
/oaos/python/bin/python setup.py install


master端的代码:

#-*- coding: UTF-8 -*-
__author__ = 'tiger'
#!/usr/bin/env python
import zmq, time, sys, os, atexit
from signal import SIGTERM
from threading import Thread
from Queue import Queue
import logging
from logging.handlers import RotatingFileHandler
   
#定义日志函数
def mylog(logfile):
    rthandler = RotatingFileHandler(logfile, 'a', maxBytes=50 * 1024 * 1024, backupCount=3)
    formatter = logging.Formatter(
        '%(levelname)s %(thread)d %(threadName)s %(process)d %(funcName)s %(asctime)s %(filename)s[line:%(lineno)d] %(message)s',
        datefmt='%a, %d %b %Y %H:%M:%S')
    rthandler.setFormatter(formatter)
    log = logging.getLogger()
    log.setLevel(logging.INFO)
    log.addHandler(rthandler)
    return log
   
#任务监控函数
def job_monitor(job_queue, add_queue, homedir, log):
    #ob_start,是代表job id号。
    #ip_list,是新任务涉及的所有IP地址列表
    global job_start
    global ip_list
    #任务报告会写到该目录下以job id生成的log文件中,每个任务会单独生成一个
    job_path = homedir + '/' + 'job/'
    while 1:
        #判断是否有新的任务,如果有执行后续的操作,没有将被阻塞一直到有新任务为止
        job_info = add_queue.get()
        #定义变量,job_status存放批量任务所以客户端的响应信息和状态
        job_status = {}
        #日志文件的绝对路径和日志文件名
        job_log = job_path + str(job_start) + '.log'
        #进行日志写操作,标记任务的开始
        file_object = open(job_log, 'w+')
        file_object.write('job ' + str(job_start)+ ' start\n')
        file_object.flush()
        job_status.update(job_info['iplist'])
        job_objects_report = 0
        log.info('job_start status:%s' % job_start)
        #新任务开始时,job_start做为全局变量会自动标记为job id,否则为0
        while job_start:
            #时间监控,判断任务超时使用
            end_time = time.time()
            #尝试获取任务的返回结果,采用非阻塞方式,如果没有任务报告,将会等待1秒后继续尝试一直到任务超时。
            try:
                ip_info = job_queue.get_nowait()
                #有任务报告时,首先判断任务的ID是否与这次的任务ID一致,如果一致写入日志,并且更新对应的client的响应状态。
                if ip_info['id'] == job_start:
                    job_objects_report = job_objects_report + 1
                    try:
                        job_status[ip_info['ip']]= [ip_info['code'], ip_info['info']]
                        log.info(ip_info)
                        file_object.write(ip_info['ip'] + ' code:' + str(ip_info['code']) + '\n')
                        file_object.write(ip_info['info'].strip())
                        file_object.write('\n')
                        file_object.flush()
                    except Exception, err:
                        log.warn(str(err))
                else:
                    continue
                #如果全部客户端都响应了任务请求,提示任务结束刷新相关任务状态。
                if job_objects_report >= job_info['count']:
                    log.info('job commpletel,job status:' + str(job_status))
                    file_object.write('job ' + str(job_start) + ' end\n')
                    file_object.close()
                    job_objects_report = 0
                    job_status = {}
                    job_start = 0
            #假如获取任务报告,失败时,判断超时状态,如果超时了,结束本次任务,刷新相关状态信息,跳出循环监听新的任务。
            except:
                if (end_time - job_info['start_time']) >= job_info['jobtimeout']:
                    #job_status.update(ip_list)
                    log.info('job_timeout,job status info:' + str(job_status))
                    #如果客户端没有返回任务报告,客户端对应的任务状态为N
                    for key, value in job_status.items():
                        if value == 'N':
                            file_object.write(key+ ' code:N\n')
                    #将相关信息写入日志,刷新任务的状态信息,推出循环
                    file_object.write('job ' + str(job_start) + ' has error please check.\n')
                    file_object.write('job ' + str(job_start) + ' end\n')
                    file_object.close()
                    ip_list = {}
                    job_status = {}
                    job_objects_report = 0
                    job_start = 0
                else:
                    time.sleep(1)
                continue
   
#负责任务接收,发布以及与客户端的连接
def ioloop(sock_file, job_queue, add_queue, log):
    #job_command定义任务的具体信息
    job_command = {}
    #job_start定义的是任务的ID,没有任务时job_start为0
    global job_start
    #定义的是新任务所以服务器地址列表
    global ip_list
    #创建监听
    context = zmq.Context()
    socks = context.socket(zmq.REP)
    socks.bind(sock_file)
    while 1:
        #获取客户端请求信息。
        req = socks.recv_pyobj()
        #获取客户端请求的类别
        try:
            req_type = req['req_type']
        except Exception, err:
            socks.send_pyobj({'rep_type':'N'})
            log.info(str(err))
            continue
        #如果是task类别,代表是客户端任务请求,并且获取客户端的IP信息
        if req_type == 'task':
            try:
                req_ip = req['ip']
            except Exception, err:
                log.info(str(err))
                socks.send_pyobj({'rep_type':'N'})
                continue
            #如果客户端请求的ip地址,在任务列表中,将需要执行的任务信息发送给客户端,否则提示没有相关任务
            if req_ip in ip_list.keys():
                ip_list.pop(req_ip)
                socks.send_pyobj(job_command)
            else:
                socks.send_pyobj({'rep_type':'N'})
        #如果是report,代表是客户端的任务响应报告
        elif req_type == 'report':
        #判断job返回状态信息,列表列表,包括job_ib,job_code(0代表ok,其他代表失败)
        #如果响应的id号等于本次执行的任务id,将报告信息通过queue传递到任务监控线程进行处理,其他的忽略。
            try:
                if req['id'] == job_start:
                    job_queue.put(req)
                    socks.send_pyobj({'rep_type':'report','id':req['id'],'status':'ok'})
                else:
                    socks.send_pyobj({'rep_type':'N'})
                    log.info('report time out.' +str(req))
            except Exception, err:
                log.info('add report queue error:' + str(err))
                socks.send_pyobj({'rep_type':'N'})
        #增加任务,当job_start!=0时,代表有任务执行,不允许添加新任务,后续改进
        #信任务的信息为字典对象,job_info代表任务的具体信息,例如需要执行的命令以及相关超时信息和任务ID等,
        #iplist代表需要执行本次任务的所有客户端IP
        #如下示例
        #ip_list={'192.168.4.195':'N','192.168.4.196':'N','192.168.4.194':'N'}
        #{'id': job_id, 'type': job_type, 'task': job_task, 'jobtimeout': job_jobtimeout,
        #            'cmdtimeout': job_cmdtimeout, 'env': job_env, 'fileserver': job_fileserver, 'rep_type': 'newtask',
        #            'rundir': job_rundir}
        elif req_type == 'addjob':
            if job_start == 0:
                try:
                    job_command = req['job_info']
                    job_start = job_command['id']
                    ip_list = req['iplist']
                    job_info = {}
                    job_info['start_time'] = time.time()
                    job_info['iplist'] = req['iplist']
                    job_info['count'] = len(req['iplist'])
                    job_info['jobtimeout'] = job_command['jobtimeout']
                    job_info['id'] = job_command['id']
                    job_info['iplist'] = req['iplist']
                    add_queue.put(job_info)
                    socks.send_pyobj('job add ok,job log job/' + str(job_info['id']) + '.log')
                    log.info('job add ok,iplist:'+ str(ip_list) + ',job_command:' + str(job_command))
                except Exception, err:
                    socks.send_pyobj('job add error ' + str(err))
                    log.warn(str(err))
            elif job_start != 0:
                socks.send_pyobj('job add error,job running.')
                log.info('job add error,job running.')
            else:
                socks.send_pyobj('job add error,parameter error.')
                log.info('job add error,parameter error.')
        else:
            socks.send_pyobj({'rep_type':'N'})
   
#检查IP地址是否正常,暂时未使用
def ip_check(ip):
    q = ip.split('.')
    return len(q) == 4 and len(filter(lambda x: x >= 0 and x <= 255, \
                                      map(int, filter(lambda x: x.isdigit(), q)))) == 4
   
#将master启动为守护进程
class Daemon:
    def __init__(self, pidfile, homedir, sock_file, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr
        self.pidfile = pidfile
        self.homedir = homedir
        self.sock_file = sock_file
   
    def _daemonize(self):
   
        #脱离父进程
        try:
            pid = os.fork()
            if pid > 0:
                sys.exit(0)
        except OSError, e:
            sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)
            #脱离终端
        os.setsid()
        #修改当前工作目录
        os.chdir("/")
        #重设文件创建权限
        os.umask(0)
        #第二次fork,禁止进程重新打开控制终端
        try:
            pid = os.fork()
            if pid > 0:
                sys.exit(0)
        except OSError, e:
            sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)
        sys.stdout.flush()
        sys.stderr.flush()
        si = file(self.stdin, 'r')
        so = file(self.stdout, 'a+')
        se = file(self.stderr, 'a+', 0)
        #重定向标准输入/输出/错误
        os.dup2(si.fileno(), sys.stdin.fileno())
        os.dup2(so.fileno(), sys.stdout.fileno())
        os.dup2(se.fileno(), sys.stderr.fileno())
        #注册程序退出时的函数,即删掉pid文件
        atexit.register(self.delpid)
        pid = str(os.getpid())
        file(self.pidfile, 'w+').write("%s\n" % pid)
   
    def delpid(self):
        os.remove(self.pidfile)
   
    def start(self):
        # Check for a pidfile to see if the daemon already runs
        try:
            pf = file(self.pidfile, 'r')
            pid = int(pf.read().strip())
            pf.close()
        except IOError:
            pid = None
        if pid:
            message = "pidfile %s already exist. Daemon already running?\n"
            sys.stderr.write(message % self.pidfile)
            sys.exit(1)
            # Start the daemon
        self._daemonize()
        self._run()
   
    def stop(self):
        # Get the pid from the pidfile
        try:
            pf = file(self.pidfile, 'r')
            pid = int(pf.read().strip())
            pf.close()
        except IOError:
            pid = None
        if not pid:
            message = "pidfile %s does not exist. Daemon not running?\n"
            sys.stderr.write(message % self.pidfile)
            return # not an error in a restart
            # Try killing the daemon process
        try:
            while 1:
                os.kill(pid, SIGTERM)
                time.sleep(0.1)
        except OSError, err:
            err = str(err)
            if err.find("No such process") > 0:
                if os.path.exists(self.pidfile):
                    os.remove(self.pidfile)
            else:
                print str(err)
                sys.exit(1)
   
    def restart(self):
        self.stop()
        self.start()
   
    def _run(self):
        pass
   
#继承Daemon重写_run函数实现自己的守护进程
class MyDaemon(Daemon):
    def _run(self, ):
        #定义日志函数
        log = mylog(self.stdout)
        global job_start
        global ip_list
        job_start = 0
        ip_list = {}
        #定义任务Q,job_queue用来与任务监控线程传递客户端的任务报告
        job_queue = Queue(maxsize=500)
        #用来告诉任务监控线程有新的任务
        add_queue = Queue(maxsize=5)
        #启动任务监控线程
        worker = Thread(target=job_monitor, args=(job_queue, add_queue, self.homedir, log))
        worker.setDaemon(True)
        worker.start()
        #启动任务监听服务,负责处理来次监控处理进程的请求,并且将监控对象的信息返回给监控进程处理。
        ioloop(self.sock_file, job_queue, add_queue, log)
   
#程序的启动入口
def main():
    #定义了程序所需的相关目录,日志文件名等信息
    homedir = os.getcwd()
    for i in ('log', 'run', 'job'):
        path = homedir + '/' + i
        if not os.path.exists(path):
            os.makedirs(path, 0755)
    stdout = homedir + '/log' + '/oaos_master.log'
    stderr = homedir + '/log' + '/oaos_master.err'
    pidfile = homedir + '/run' + '/oaos_master.pid'
    #定义了监听端口
    sock_file = "tcp://192.168.4.194:7777"
    daemon = MyDaemon(pidfile, homedir, sock_file, stdout=stdout, stderr=stderr)
    if len(sys.argv) == 2:
        if 'start' == sys.argv[1]:
            daemon.start()
        elif 'stop' == sys.argv[1]:
            daemon.stop()
        elif 'restart' == sys.argv[1]:
            daemon.restart()
        else:
            print "Unknown command"
            sys.exit(2)
        sys.exit(0)
    else:
        print "usage: %s start|stop|restart" % sys.argv[0]
        sys.exit(2)
   
   
if __name__ == "__main__":
    main()

客户端与增加任务的接口见简易linux集中管理工具二




One Response


    还没有评论!
1  

Leave your comment

请留下您的姓名(*)

请输入正确的邮箱地址(*)

请输入你的评论(*)


感谢开源 © 2016. All rights reserved.&3Q Open Source&^_^赣ICP备15012863号-1^_^
乐于分享共同进步 KreativeThemes