了解过不少linux集中管理工具,基本原理都差不太多,只是复杂度和安装难度不同而已,本人习惯一切从简也喜欢玩,自己写了个简单的。不追求一次性管理成千上万台服务器,简单的管理1000台以内的服务器因该问题不大。基本原则安装简单,管理简单,功能简单满足日常基本需求即可。其他的安全和别的问题,大家可以自己去折腾。
该工具实现了2个基本的功能,一个是集中发布命令,另外一个是集中发布脚本,脚本发布需要一个http服务器提供客户端下载相关脚本。linux集中管理基本也是基于这2条,别的就根据各自喜欢自己去增加吧。
基本思路master监听来自客户端的请求。正常的请求分为以下几个类别
1、新任务,即需要发布的批量任务
2、客户端任务请求,即属于该客户端的任务
3、客户端任务报告,来次客户端的任务反馈
如图
服务端master负责任务发布,以及任务监控,客户端与服务端建立长连接,每隔1秒左右会请求master询问是否有属于自己的任务,这样做很傻,但是简单,服务器不需要去做太复杂的连接管理,目前服务器还是属于阻塞连接,后续有机会改改。
大概原理就这样,其他的就不说太多废话了。
使用到的软件和包python+pyzmq,系统命令有wget和tail
系统环境需要安装以下编译所需的编译环境
小工具的安装步骤
mkdir /oaos
tar -zxvf Python-2.7.9.tgz
cd Python-2.7.9
./configure --prefix=/oaos/python
make && make install
tar -zxvf setuptools-12.1.tar.gz
cd setuptools-12.1
/oaos/python/bin/python setup.py build
/oaos/python/bin/python setup.py install
unzip pyzmq-2.2.0.1.zip
cd pyzmq-2.2.0.1
/oaos/python/bin/python setup.py build
/oaos/python/bin/python setup.py install
master端的代码:
#-*- coding: UTF-8 -*- __author__ = 'tiger' #!/usr/bin/env python import zmq, time, sys, os, atexit from signal import SIGTERM from threading import Thread from Queue import Queue import logging from logging.handlers import RotatingFileHandler #定义日志函数 def mylog(logfile): rthandler = RotatingFileHandler(logfile, 'a', maxBytes=50 * 1024 * 1024, backupCount=3) formatter = logging.Formatter( '%(levelname)s %(thread)d %(threadName)s %(process)d %(funcName)s %(asctime)s %(filename)s[line:%(lineno)d] %(message)s', datefmt='%a, %d %b %Y %H:%M:%S') rthandler.setFormatter(formatter) log = logging.getLogger() log.setLevel(logging.INFO) log.addHandler(rthandler) return log #任务监控函数 def job_monitor(job_queue, add_queue, homedir, log): #ob_start,是代表job id号。 #ip_list,是新任务涉及的所有IP地址列表 global job_start global ip_list #任务报告会写到该目录下以job id生成的log文件中,每个任务会单独生成一个 job_path = homedir + '/' + 'job/' while 1: #判断是否有新的任务,如果有执行后续的操作,没有将被阻塞一直到有新任务为止 job_info = add_queue.get() #定义变量,job_status存放批量任务所以客户端的响应信息和状态 job_status = {} #日志文件的绝对路径和日志文件名 job_log = job_path + str(job_start) + '.log' #进行日志写操作,标记任务的开始 file_object = open(job_log, 'w+') file_object.write('job ' + str(job_start)+ ' start\n') file_object.flush() job_status.update(job_info['iplist']) job_objects_report = 0 log.info('job_start status:%s' % job_start) #新任务开始时,job_start做为全局变量会自动标记为job id,否则为0 while job_start: #时间监控,判断任务超时使用 end_time = time.time() #尝试获取任务的返回结果,采用非阻塞方式,如果没有任务报告,将会等待1秒后继续尝试一直到任务超时。 try: ip_info = job_queue.get_nowait() #有任务报告时,首先判断任务的ID是否与这次的任务ID一致,如果一致写入日志,并且更新对应的client的响应状态。 if ip_info['id'] == job_start: job_objects_report = job_objects_report + 1 try: job_status[ip_info['ip']]= [ip_info['code'], ip_info['info']] log.info(ip_info) file_object.write(ip_info['ip'] + ' code:' + str(ip_info['code']) + '\n') file_object.write(ip_info['info'].strip()) file_object.write('\n') file_object.flush() except Exception, err: log.warn(str(err)) else: continue #如果全部客户端都响应了任务请求,提示任务结束刷新相关任务状态。 if job_objects_report >= job_info['count']: log.info('job commpletel,job status:' + str(job_status)) file_object.write('job ' + str(job_start) + ' end\n') file_object.close() job_objects_report = 0 job_status = {} job_start = 0 #假如获取任务报告,失败时,判断超时状态,如果超时了,结束本次任务,刷新相关状态信息,跳出循环监听新的任务。 except: if (end_time - job_info['start_time']) >= job_info['jobtimeout']: #job_status.update(ip_list) log.info('job_timeout,job status info:' + str(job_status)) #如果客户端没有返回任务报告,客户端对应的任务状态为N for key, value in job_status.items(): if value == 'N': file_object.write(key+ ' code:N\n') #将相关信息写入日志,刷新任务的状态信息,推出循环 file_object.write('job ' + str(job_start) + ' has error please check.\n') file_object.write('job ' + str(job_start) + ' end\n') file_object.close() ip_list = {} job_status = {} job_objects_report = 0 job_start = 0 else: time.sleep(1) continue #负责任务接收,发布以及与客户端的连接 def ioloop(sock_file, job_queue, add_queue, log): #job_command定义任务的具体信息 job_command = {} #job_start定义的是任务的ID,没有任务时job_start为0 global job_start #定义的是新任务所以服务器地址列表 global ip_list #创建监听 context = zmq.Context() socks = context.socket(zmq.REP) socks.bind(sock_file) while 1: #获取客户端请求信息。 req = socks.recv_pyobj() #获取客户端请求的类别 try: req_type = req['req_type'] except Exception, err: socks.send_pyobj({'rep_type':'N'}) log.info(str(err)) continue #如果是task类别,代表是客户端任务请求,并且获取客户端的IP信息 if req_type == 'task': try: req_ip = req['ip'] except Exception, err: log.info(str(err)) socks.send_pyobj({'rep_type':'N'}) continue #如果客户端请求的ip地址,在任务列表中,将需要执行的任务信息发送给客户端,否则提示没有相关任务 if req_ip in ip_list.keys(): ip_list.pop(req_ip) socks.send_pyobj(job_command) else: socks.send_pyobj({'rep_type':'N'}) #如果是report,代表是客户端的任务响应报告 elif req_type == 'report': #判断job返回状态信息,列表列表,包括job_ib,job_code(0代表ok,其他代表失败) #如果响应的id号等于本次执行的任务id,将报告信息通过queue传递到任务监控线程进行处理,其他的忽略。 try: if req['id'] == job_start: job_queue.put(req) socks.send_pyobj({'rep_type':'report','id':req['id'],'status':'ok'}) else: socks.send_pyobj({'rep_type':'N'}) log.info('report time out.' +str(req)) except Exception, err: log.info('add report queue error:' + str(err)) socks.send_pyobj({'rep_type':'N'}) #增加任务,当job_start!=0时,代表有任务执行,不允许添加新任务,后续改进 #信任务的信息为字典对象,job_info代表任务的具体信息,例如需要执行的命令以及相关超时信息和任务ID等, #iplist代表需要执行本次任务的所有客户端IP #如下示例 #ip_list={'192.168.4.195':'N','192.168.4.196':'N','192.168.4.194':'N'} #{'id': job_id, 'type': job_type, 'task': job_task, 'jobtimeout': job_jobtimeout, # 'cmdtimeout': job_cmdtimeout, 'env': job_env, 'fileserver': job_fileserver, 'rep_type': 'newtask', # 'rundir': job_rundir} elif req_type == 'addjob': if job_start == 0: try: job_command = req['job_info'] job_start = job_command['id'] ip_list = req['iplist'] job_info = {} job_info['start_time'] = time.time() job_info['iplist'] = req['iplist'] job_info['count'] = len(req['iplist']) job_info['jobtimeout'] = job_command['jobtimeout'] job_info['id'] = job_command['id'] job_info['iplist'] = req['iplist'] add_queue.put(job_info) socks.send_pyobj('job add ok,job log job/' + str(job_info['id']) + '.log') log.info('job add ok,iplist:'+ str(ip_list) + ',job_command:' + str(job_command)) except Exception, err: socks.send_pyobj('job add error ' + str(err)) log.warn(str(err)) elif job_start != 0: socks.send_pyobj('job add error,job running.') log.info('job add error,job running.') else: socks.send_pyobj('job add error,parameter error.') log.info('job add error,parameter error.') else: socks.send_pyobj({'rep_type':'N'}) #检查IP地址是否正常,暂时未使用 def ip_check(ip): q = ip.split('.') return len(q) == 4 and len(filter(lambda x: x >= 0 and x <= 255, \ map(int, filter(lambda x: x.isdigit(), q)))) == 4 #将master启动为守护进程 class Daemon: def __init__(self, pidfile, homedir, sock_file, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): self.stdin = stdin self.stdout = stdout self.stderr = stderr self.pidfile = pidfile self.homedir = homedir self.sock_file = sock_file def _daemonize(self): #脱离父进程 try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) #脱离终端 os.setsid() #修改当前工作目录 os.chdir("/") #重设文件创建权限 os.umask(0) #第二次fork,禁止进程重新打开控制终端 try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) sys.stdout.flush() sys.stderr.flush() si = file(self.stdin, 'r') so = file(self.stdout, 'a+') se = file(self.stderr, 'a+', 0) #重定向标准输入/输出/错误 os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) #注册程序退出时的函数,即删掉pid文件 atexit.register(self.delpid) pid = str(os.getpid()) file(self.pidfile, 'w+').write("%s\n" % pid) def delpid(self): os.remove(self.pidfile) def start(self): # Check for a pidfile to see if the daemon already runs try: pf = file(self.pidfile, 'r') pid = int(pf.read().strip()) pf.close() except IOError: pid = None if pid: message = "pidfile %s already exist. Daemon already running?\n" sys.stderr.write(message % self.pidfile) sys.exit(1) # Start the daemon self._daemonize() self._run() def stop(self): # Get the pid from the pidfile try: pf = file(self.pidfile, 'r') pid = int(pf.read().strip()) pf.close() except IOError: pid = None if not pid: message = "pidfile %s does not exist. Daemon not running?\n" sys.stderr.write(message % self.pidfile) return # not an error in a restart # Try killing the daemon process try: while 1: os.kill(pid, SIGTERM) time.sleep(0.1) except OSError, err: err = str(err) if err.find("No such process") > 0: if os.path.exists(self.pidfile): os.remove(self.pidfile) else: print str(err) sys.exit(1) def restart(self): self.stop() self.start() def _run(self): pass #继承Daemon重写_run函数实现自己的守护进程 class MyDaemon(Daemon): def _run(self, ): #定义日志函数 log = mylog(self.stdout) global job_start global ip_list job_start = 0 ip_list = {} #定义任务Q,job_queue用来与任务监控线程传递客户端的任务报告 job_queue = Queue(maxsize=500) #用来告诉任务监控线程有新的任务 add_queue = Queue(maxsize=5) #启动任务监控线程 worker = Thread(target=job_monitor, args=(job_queue, add_queue, self.homedir, log)) worker.setDaemon(True) worker.start() #启动任务监听服务,负责处理来次监控处理进程的请求,并且将监控对象的信息返回给监控进程处理。 ioloop(self.sock_file, job_queue, add_queue, log) #程序的启动入口 def main(): #定义了程序所需的相关目录,日志文件名等信息 homedir = os.getcwd() for i in ('log', 'run', 'job'): path = homedir + '/' + i if not os.path.exists(path): os.makedirs(path, 0755) stdout = homedir + '/log' + '/oaos_master.log' stderr = homedir + '/log' + '/oaos_master.err' pidfile = homedir + '/run' + '/oaos_master.pid' #定义了监听端口 sock_file = "tcp://192.168.4.194:7777" daemon = MyDaemon(pidfile, homedir, sock_file, stdout=stdout, stderr=stderr) if len(sys.argv) == 2: if 'start' == sys.argv[1]: daemon.start() elif 'stop' == sys.argv[1]: daemon.stop() elif 'restart' == sys.argv[1]: daemon.restart() else: print "Unknown command" sys.exit(2) sys.exit(0) else: print "usage: %s start|stop|restart" % sys.argv[0] sys.exit(2) if __name__ == "__main__": main()
客户端与增加任务的接口见简易linux集中管理工具二
One Response