Masscan:一个快速的网络扫描工具,用于识别网络中活跃的主机。 Nmap:一个深入的网络分析工具,用于识别服务、版本信息以及潜在的漏洞。 ELK Stack:一个强大的日志分析和可视化平台,用于数据的存储和展示。
部署步骤如下:
安装Masscan及其依赖项,确保环境支持高效的网络扫描。
yum install git gcc make libpcap-devel
git clone https://github.com/robertdavidgraham/masscan
cd masscan
make
cp bin/masscan /bin
wget https://nmap.org/dist/nmap-7.95-2.x86_64.rpm
#释然IT杂谈
rpm -ivh nmap-7.95-2.x86_64.rpm
通过Docker快速启动Elasticsearch和Kibana服务,为数据管理和可视化提供支持。
docker run -d --name es -p 127.0.0.1:9201:9200 -p 9300:9300 -e ES_JAVA_OPTS="-Xms2G -Xmx2G" -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch-oss:7.1.1
docker run --name kibana -d -p 5601:5601 -e ELASTICSEARCH_HOSTS=http://127.0.0.1:9201 docker.elastic.co/kibana/kibana-oss:7.1.1
主机发现:使用Masscan对指定网络范围进行扫描,快速识别存活的主机。 详细扫描:对发现的存活主机使用Nmap进行详细扫描,并将结果以XML格式导出。 数据整合与可视化:将XML数据转换为JSON格式,导入到Elasticsearch中,并通过Kibana实现数据的可视化展示。
为提高效率,自动化脚本是关键:
利用多线程技术并行执行Masscan和Nmap扫描。 开发自定义函数将扫描结果从XML转换为JSON格式。 将转换后的数据索引到Elasticsearch中,并使用Kibana进行数据的实时监控和分析。(释然IT杂谈)
from elasticsearch import Elasticsearch
from settings import es_ip, es_port, ip_list
from utils import Logger
import xmltodict
import os
import threading
import time
import json
logger = Logger.get_logger(__name__, path=os.getcwd())
es = Elasticsearch([{'host': es_ip, 'port': es_port}])
def xml_to_json(path):
try:
with open(path, 'r') as load_f:
temp_ = xmltodict.parse(load_f).get("nmaprun")
return {key: temp_[key]
for key in temp_
if key not in ["verbose", 'scaninfo', 'taskbegin', 'taskend', "debugging"]}
except Exception as e:
logger.error(str(e)+path)
return {}
def json_to_es(index, json_):
try:
es.index(index=index, doc_type="vuln", body=json_)
except Exception as e:
try:
es.index(index=index + '_', doc_type="vuln", body=json.dumps(json_))
except Exception as e:
try:
es.index(index=index + '__', doc_type="vuln", body=json.dumps(json_))
except Exception as e:
logger.error(str(e))
pass
def masscan_scan(ip):
try:
if ip:
if not os.path.exists('tmp'):
os.makedirs('tmp')
os.system('masscan --ping {0} --rate 1000 -oL tmp/{1}.txt'.format(ip, ip.split('/')[0]))
except Exception as e:
logger.error(str(e))
def nmap_scan(ip):
try:
if ip:
if not os.path.exists('report'):
os.makedirs('report')
os.system('nmap -sV -Pn -A -T5 --script=nmap-vulners/vulners.nse -oX report/{0}.xml {1}'.format(ip, ip))
except Exception as e:
logger.error(str(e))
def masscan_scan_worker():
#释然IT杂谈
t_obj = []
for i in range(len(ip_list)):
t = threading.Thread(target=masscan_scan, args=(ip_list[i],))
t_obj.append(t)
t.start()
for t in t_obj:
t.join()
if not os.path.exists('alive_host'):
os.makedirs('alive_host')
cmd = """ awk '{print $4}' tmp/*.txt | tr -s '\n' > alive_host/host.txt """
os.system(cmd)
os.system("""rm -f tmp/*.txt""")
def read_txt(path):
lines = []
try:
with open(path, 'r') as file_to_read:
while True:
line = file_to_read.readline()
if not line:
break
line = line.strip('\n')
lines.append(line)
except Exception as e:
logger.error(str(e))
return lines
def nmap_scan_worker():
lst = read_txt('alive_host/host.txt')
tmp_ = [lst[i:i+5] for i in range(0, len(lst), 5)]
print tmp_
if tmp_:
for list_ in tmp_:
t_obj = []
for i in range(len(list_)):
t = threading.Thread(target=nmap_scan, args=(list_[i],))
t_obj.append(t)
t.start()
for t in t_obj:
t.join()
def nmap_to_es(index):
if os.path.exists('report'):
files = os.listdir('report')
for file in files:
json_to_es(index, xml_to_json('report'+'/'+file))
os.system("""rm -f report/*.xml""")
if __name__ == '__main__':
while True:
now = time.strftime('%Y-%m-%d')
masscan_scan_worker()
nmap_scan_worker()
es.indices.delete('nmap-*')
nmap_to_es('nmap-' + now)
完整代码:
https://github.com/njcx/nmap_to_es.git
最终,通过Kibana Dashboard,用户可以直观地查看内网资产的状态,包括但不限于开放端口、运行的服务及其版本信息,以及可能存在的安全漏洞。