本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下:
import simplejson
import urllib
import urllib2
import os, sys
import logging
try:
import sqlite3
except ImportError:
sys.stderr.write("ERROR: Unable to locate Python SQLite3 module. " \
"Please verify your installation. Exiting...\n")
sys.exit(-1)
MD5 = "5248f774d2ee0a10936d0b1dc89107f1"
MD5 = "12fa5fb74201d9b6a14f63fbf9a81ff6" #do not have report on virustotal.com
APIKEY = "xxxxxxxxxxxxxxxxxx"用自己的
class VirusTotalDatabase:
"""
Database abstraction layer.
"""
def __init__(self, db_file):
log = logging.getLogger("Database.Init")
self.__dbfile = db_file
self._conn = None
self._cursor = None
# Check if SQLite database already exists. If it doesn't exist I invoke
# the generation procedure.
if not os.path.exists(self.__dbfile):
if self._generate():
print("Generated database \"%s\" which didn't" \
" exist before." % self.__dbfile)
else:
print("Unable to generate database")
# Once the database is generated of it already has been, I can
# initialize the connection.
try:
self._conn = sqlite3.connect(self.__dbfile)
self._cursor = self._conn.cursor()
except Exception, why:
print("Unable to connect to database \"%s\": %s."
% (self.__dbfile, why))
log.debug("Connected to SQLite database \"%s\"." % self.__dbfile)
def _generate(self):
"""
Creates database structure in a SQLite file.
"""
if os.path.exists(self.__dbfile):
return False
db_dir = os.path.dirname(self.__dbfile)
if not os.path.exists(db_dir):
try:
os.makedirs(db_dir)
except (IOError, os.error), why:
print("Something went wrong while creating database " \
"directory \"%s\": %s" % (db_dir, why))
return False
conn = sqlite3.connect(self.__dbfile)
cursor = conn.cursor()
cursor.execute("CREATE TABLE virustotal (\n" \
" id INTEGER PRIMARY KEY,\n" \
" md5 TEXT NOT NULL,\n" \
" Kaspersky TEXT DEFAULT NULL,\n" \
" McAfee TEXT DEFAULT NULL,\n" \
" Symantec TEXT DEFAULT NULL,\n" \
" Norman TEXT DEFAULT NULL,\n" \
" Avast TEXT DEFAULT NULL,\n" \
" NOD32 TEXT DEFAULT NULL,\n" \
" BitDefender TEXT DEFAULT NULL,\n" \
" Microsoft TEXT DEFAULT NULL,\n" \
" Rising TEXT DEFAULT NULL,\n" \
" Panda TEXT DEFAULT NULL\n" \
");")
print "create db:%s sucess" % self.__dbfile
return True
def _get_task_dict(self, row):
try:
task = {}
task["id"] = row[0]
task["md5"] = row[1]
task["Kaspersky"] = row[2]
task["McAfee"] = row[3]
task["Symantec"] = row[4]
task["Norman"] = row[5]
task["Avast"] = row[6]
task["NOD32"] = row[7]
task["BitDefender"] = row[8]
task["Microsoft"] = row[9]
task["Rising"] = row[10]
task["Panda"] = row[11]
return task
except Exception, why:
return None
def add_sample(self, md5, virus_dict):
"""
"""
task_id = None
if not self._cursor:
return None
if not md5 or md5 == "":
return None
Kaspersky = virus_dict.get("Kaspersky", None)
McAfee = virus_dict.get("McAfee", None)
Symantec = virus_dict.get("Symantec", None)
Norman = virus_dict.get("Norman", None)
Avast = virus_dict.get("Avast", None)
NOD32 = virus_dict.get("NOD32", None)
BitDefender = virus_dict.get("BitDefender", None)
Microsoft = virus_dict.get("Microsoft", None)
Rising = virus_dict.get("Rising", None)
Panda = virus_dict.get("Panda", None)
self._conn.text_factory = str
try:
self._cursor.execute("SELECT id FROM virustotal WHERE md5 = ",
(md5,))
sample_row = self._cursor.fetchone()
except sqlite3.OperationalError, why:
print "sqlite3 error:%s\n" % str(why)
return False
if sample_row:
try:
sample_row = sample_row[0]
self._cursor.execute("UPDATE virustotal SET Kaspersky=",
(Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, Microsoft,\
Rising, Panda, sample_row))
self._conn.commit()
task_id = sample_row
except sqlite3.OperationalError, why:
print("Unable to update database: %s." % why)
return False
else: #the sample not in the database
try:
self._cursor.execute("INSERT INTO virustotal " \
"(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\
Microsoft, Rising, Panda) " \
"VALUES (",
(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\
Microsoft, Rising, Panda))
self._conn.commit()
task_id = self._cursor.lastrowid
except sqlite3.OperationalError, why:
print "why",str(why)
return None
print "add_to_db:%s, task_id:%s" % (str(self.__dbfile), str(task_id))
return task_id
def get_sample(self):
"""
Gets a task from pending queue.
"""
log = logging.getLogger("Database.GetTask")
if not self._cursor:
log.error("Unable to acquire cursor.")
return None
# Select one item from the queue table with higher priority and older
# addition date which has not already been processed.
try:
self._cursor.execute("SELECT * FROM virustotal " \
#"WHERE lock = 0 " \
#"AND status = 0 " \
"ORDER BY id, added_on LIMIT 1;")
except sqlite3.OperationalError, why:
log.error("Unable to query database: %s." % why)
return None
sample_row = self._cursor.fetchone()
if sample_row:
return self._get_task_dict(sample_row)
else:
return None
def search_md5(self, md5):
"""
"""
if not self._cursor:
return None
if not md5 or len(md5) != 32:
return None
try:
self._cursor.execute("SELECT * FROM virustotal " \
"WHERE md5 = " \
#"AND status = 1 " \
"ORDER BY id DESC;",
(md5,))
except sqlite3.OperationalError, why:
return None
task_dict = {}
for row in self._cursor.fetchall():
task_dict = self._get_task_dict(row)
#if task_dict:
#tasks.append(task_dict)
return task_dict
class VirusTotal:
""""""
def __init__(self, md5):
"""Constructor"""
self._virus_dict = {}
self._md5 = md5
self._db_file = r"./db/virustotal.db"
self.get_report_dict()
def repr(self):
return str(self._virus_dict)
def submit_md5(self, file_path):
import postfile
#submit the file
FILE_NAME = os.path.basename(file_path)
host = "www.virustotal.com"
selector = "https://www.virustotal.com/vtapi/v2/file/scan"
fields = [("apikey", APIKEY)]
file_to_send = open(file_path, "rb").read()
files = [("file", FILE_NAME, file_to_send)]
json = postfile.post_multipart(host, selector, fields, files)
print json
pass
def get_report_dict(self):
result_dict = {}
url = "https://www.virustotal.com/vtapi/v2/file/report"
parameters = {"resource": self._md5,
"apikey": APIKEY}
data = urllib.urlencode(parameters)
req = urllib2.Request(url, data)
response = urllib2.urlopen(req)
json = response.read()
response_dict = simplejson.loads(json)
if response_dict["response_code"]: #has result
scans_dict = response_dict.get("scans", {})
for anti_virus_comany, virus_name in scans_dict.iteritems():
if virus_name["detected"]:
result_dict.setdefault(anti_virus_comany, virus_name["result"])
return result_dict
def write_to_db(self):
""""""
db = VirusTotalDatabase(self._db_file)
virus_dict = self.get_report_dict()
db.add_sample(self._md5, virus_dict)
使用方法如下:
config = {'input':"inputMd5s"}
fp = open(config['input'], "r")
content = fp.readlines()
MD5S = []
for md5 in ifilter(lambda x:len(x)>0, imap(string.strip, content)):
MD5S.append(md5)
print "MD5S",MD5S
fp.close()
from getVirusTotalInfo import VirusTotal
#得到扫描结果并写入数库
for md5 in MD5S:
virus_total = VirusTotal(md5)
virus_total.write_to_db()
希望本文所述对大家的Python程序设计有所帮助。
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件!
如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
白云城资源网 Copyright www.dyhadc.com
暂无“python实现封装得到virustotal扫描结果”评论...
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新日志
2025年11月09日
2025年11月09日
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]