作者:容 易 2015-12-18 15:07:37
版权所有未经允许请勿转载
学习了几天elk,感觉非常不错。由于实际业务需求的关系,Logstash对我们显得太过庞大,而且相对学习成本也比较高。为了能够简单高效的实现日志解析并且写入elasticsearch。花了点时间写了一个简单的python并行日志处理的脚本。该脚本的并行处理框架已经写好了,理论上只需要简单修改就可以实现各种数据库的数据抽取处理并且插入elasticsearch。现在分享给大家,希望看了后不是一味的复制了事,能够将更完善的和更高效的优化代码进行反馈和分享谢谢,我的QQ46053710。
该脚本以分析WEB应用日志为例,包括了Geoip信息的获取和geo_point类型的mappings,以及KIBANA的UTC时间的处理,另外还有读取和插入内容的统计等。大家可以根据自己的实际需求去修改和调整。
另外进行批量插入的时候需要对elasticsearch的某些默认参数进行调整,否则可能会出现elasticsearch处理不过来等原因造成部分插入失败。以下参数是我在批量插入时elasticsearch调整的参数,可以根据实际的情况进行调整。
index.store.type: niofs
index.cache.field.type: soft
bootstrap.mlockall: true
threadpool.index.size: 8
threadpool.index.queue_size: 1200
threadpool.search.size: 8
threadpool.search.queue_size: 1000
threadpool.get.type: fixed
threadpool.get.size: 8
threadpool.get.queue_size: 1000
threadpool.bulk.type: fixed
threadpool.bulk.size: 8
threadpool.bulk.queue_size: 1200
threadpool.flush.size: 8
threadpool.flush.queue_size: 1000
indices.store.throttle.type: merge
indices.store.throttle.max_bytes_per_sec: 100mb
index.merge.scheduler.max_thread_count: 16
index.translog.flush_threshold_size: 1024MB
index.gateway.local.sync: 30s
index.refresh_interval: 30s
index.translog.interval: 15s
index.translog.flush_threshold_ops: 50000
index.translog.flush_threshold_period: 30s
经过处理写入elasticsearch的基本数据结构如下
res = {"@timestamp": ‘2015-12-09T12:31:22.000+08:00’,
"remote_ip": ‘112.64.186.119’,
"rel_ip": ’10.98.50.131’,
"rel_port": ‘8010’,
"domain": ‘www.3qos.com’,
"v_ip": ’10.98.3.162’,
"r_time": 30,
"status": 200,
"bytes": 128,
"r_url": ‘/mas/’,
"r_method": ‘GET’,
"user_agent": ‘Apache-HttpClient/4.3.3 (java 1.5)’,
"geoip":geoip,
"location": {"lon":geoip["longitude"], "lat": geoip["latitude"]}
}
脚本内容
__author__ = 'tiger'
# -*- coding: utf-8 -*-
#!/usr/bin/env python
import multiprocessing
import time, pygeoip, re, sys
import datetime, requests, json
from elasticsearch import Elasticsearch
from elasticsearch import helpers
reload(sys)
sys.setdefaultencoding("utf8")
def read_file(data_queue, source_file, rsize, insert_process):
total_count = 0
try:
stdin_log = open(source_file, 'r')
except Exception, err_info:
print err_info
lines = stdin_log.readlines(rsize)
while lines:
total_count = total_count + len(lines)
data_queue.put(lines)
#print data_queue.qsize()
lines = stdin_log.readlines(rsize)
stdin_log.close()
for i in range(insert_process):
data_queue.put('ok')
data_queue.task_done()
print "read lines %s." % total_count
def insert_es(data_queue, es_info, index_name, type_name, commplete_queue, geodb_path):
es_info = es_info
try:
es = Elasticsearch([es_info])
except Exception, err:
print "conn es error " + str(err)
total_count = 0
#正则匹配""包含的内容
regex_str = re.compile('\B".*?"\B')
#获取源IP的地域信息
geodb = geodb_path
gi = pygeoip.GeoIP(geodb, pygeoip.MEMORY_CACHE)
while 1:
res_list = data_queue.get()
values_list = []
if res_list == 'ok':
break
rows = len(res_list)
error_rows = 0
for line in res_list:
#字符集的特殊处理,否则可能会出错,可以根据实际情况去调整
try:
line = line.encode('utf-8')
except:
line = line.decode('GB2312','ignore').encode('utf-8')
line_format = line.strip().split()
r_time = line_format[9][1:]
date = datetime.datetime.strptime(r_time, '%d/%b/%Y:%H:%M:%S')
x = date.strftime('%Y-%m-%dT%H:%M:%S.000+08:00')
res_info = regex_str.findall(line)
try:
geoip = gi.record_by_addr(line_format[0])
except Exception, err:
print err + ' ip:' + str(line_format[0])
geoip = 0
try:
url_method = res_info[0].replace('"', "").strip().split()
res = {"@timestamp": x, "remote_ip": line_format[0],
"rel_ip": line_format[1], "rel_port": line_format[4],
"domain": line_format[2], "v_ip": line_format[3],
"r_time": int(line_format[5]), "status": int(line_format[14]),
"bytes": int(line_format[15]), "r_url": url_method[1].split('?')[0],
"r_method": url_method[0], "user_agent": res_info[2].replace('"', ""),
"referer_url": res_info[1].replace('"', "").strip().split('?')[0]}
except Exception, err:
error_rows = error_rows + 1
print "res " + str(err)
if geoip:
res["geoip"] = geoip
res["location"] = {"lon": geoip["longitude"], "lat": geoip["latitude"]}
values_list.append({"_index": index_name, "_type": type_name, "_source": res})
try:
helpers.bulk(es, values_list)
total_count = total_count + rows - error_rows
except Exception, error_info:
print "insert error " + str(error_info)
commplete_queue.put(total_count)
print "insert process commplete will stop."
def main():
#定义日志文件的绝对路径
source_file = '/root/chinapnr151209.log.5'
#需要导入的日志文件
#es服务器的IP,默认端口9200
es_info = "192.168.4.181"
#插入指定的索引和type
index_name = 'pnr_netscaler-map-log'
type_name = 'maplog'
#索引是否存在,如果为0代表需要创建索引
index_create = 0
#URL需要map的索引
url = 'http://192.168.4.181:9200/' + index_name
#每次读取文件的SIZE,例如每次读取512K 1024 * 512 * 1
rsize = 1024 * 1024 * 1
start_time = time.time()
#定义执行批量insert的进程数
insert_process = 16
#总读取的行数
total_row = 0
#geoip 数据的路径
geodb = '/root/test/GeoLiteCity.dat'
data_manager = multiprocessing.Manager()
data_queue = data_manager.Queue(maxsize=40)
commplete_manager = multiprocessing.Manager()
pro_pool = multiprocessing.Pool(processes=insert_process + 1)
commplete_queue = commplete_manager.Queue(maxsize=insert_process)
#索引是否存在,如果为0代表需要创建索引
if index_create == 0:
try:
#es_create = Elasticsearch([es_info])
#es_create.indices.create(index=index_name, ignore=400)
values = {"mappings": {"maplog": {"properties": {"location": {"type": "geo_point"}}}}}
r = requests.put(url, json.dumps(values))
print r
except Exception, err:
print err
print "start insert es process."
for i in xrange(insert_process):
pro_pool.apply_async(insert_es, args=(data_queue, es_info, index_name, type_name, commplete_queue, geodb))
print "start select db process."
pro_pool.apply_async(read_file, args=(data_queue, source_file, rsize, insert_process))
pro_pool.close()
pro_pool.join()
while insert_process > 0:
single_count = commplete_queue.get()
total_row = total_row + single_count
insert_process = insert_process - 1
print "insert lines:%s." % total_row
elapsed_time = time.time() - start_time
print "The elapsed time is %s seconds." % elapsed_time
if __name__ == "__main__":
main()
One Response