Malware analysis 4: Work with VirusTotal API v3. Create own python script.
﷽
Hello, cybersecurity enthusiasts and white hackers!
This post is the result of my self-researching on how the VirusTotal API works.
about VirusTotal API
You have probably used the services of the https://virustotal.com site more than once to check whether the binaries contain malicious functions, or to test your own developments.
This service have a free API. After registering an account, you can get an API key:
But there is a limitation that is not critical for research purposes: 4 requests per minute:
API documentation is located at here
practical example
For practical purposes, I wrote a simple script that uploads a local file to VirusTotal (so far only less than 32 MB
) and gives the result of its analysis in numbers, how many AV-engines consider it malicious.
Let’s go. Firstly, according to the documentation, we indicate our API key in all requests:
#...
# for terminal colors
class Colors:
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
PURPLE = '\033[95m'
ENDC = '\033[0m'
VT_API_KEY = "My VirusTotal API key"
#...
class VTScan:
def __init__(self):
self.headers = {
"x-apikey" : VT_API_KEY,
"User-Agent" : "vtscan v.1.0",
"Accept-Encoding" : "gzip, deflate",
}
#...
Then I created function which upload local file to virustotal:
class VTScan:
#...
def upload(self, malware_path):
print (Colors.BLUE + "upload file: " + malware_path + "..." + Colors.ENDC)
self.malware_path = malware_path
upload_url = VT_API_URL + "files"
files = {"file" : (
os.path.basename(malware_path),
open(os.path.abspath(malware_path), "rb"))
}
print (Colors.YELLOW + "upload to " + upload_url + Colors.ENDC)
res = requests.post(upload_url, headers = self.headers, files = files)
if res.status_code == 200:
result = res.json()
self.file_id = result.get("data").get("id")
print (Colors.YELLOW + self.file_id + Colors.ENDC)
print (Colors.GREEN + "successfully upload PE file: OK" + Colors.ENDC)
else:
print (Colors.RED + "failed to upload PE file :(" + Colors.ENDC)
print (Colors.RED + "status code: " + str(res.status_code) + Colors.ENDC)
sys.exit()
#...
Then, created function which get information about the results of analysis:
def analyse(self):
print (Colors.BLUE + "get info about the results of analysis..." + Colors.ENDC)
analysis_url = VT_API_URL + "analyses/" + self.file_id
res = requests.get(analysis_url, headers = self.headers)
if res.status_code == 200:
result = res.json()
status = result.get("data").get("attributes").get("status")
if status == "completed":
stats = result.get("data").get("attributes").get("stats")
results = result.get("data").get("attributes").get("results")
print (Colors.RED + "malicious: " + str(stats.get("malicious")) + Colors.ENDC)
print (Colors.YELLOW + "undetected : " + str(stats.get("undetected")) + Colors.ENDC)
print ()
for k in results:
if results[k].get("category") == "malicious":
print ("==================================================")
print (Colors.GREEN + results[k].get("engine_name") + Colors.ENDC)
print ("version : " + results[k].get("engine_version"))
print ("category : " + results[k].get("category"))
print ("result : " + Colors.RED + results[k].get("result") + Colors.ENDC)
print ("method : " + results[k].get("method"))
print ("update : " + results[k].get("engine_update"))
print ("==================================================")
print ()
print (Colors.GREEN + "successfully analyse: OK" + Colors.ENDC)
sys.exit()
elif status == "queued":
print (Colors.BLUE + "status QUEUED..." + Colors.ENDC)
with open(os.path.abspath(self.malware_path), "rb") as malware_path:
b = f.read()
hashsum = hashlib.sha256(b).hexdigest()
self.info(hashsum)
else:
print (Colors.RED + "failed to get results of analysis :(" + Colors.ENDC)
print (Colors.RED + "status code: " + str(res.status_code) + Colors.ENDC)
sys.exit()
Here, if the file that we uploaded is analyzed and ready, then we output the result to the console: how many engines consider our file to be malicious in total, if the file is in the queue, then we read the analysis results using the SHA-256 sum of our file as an identifier:
def info(self, file_hash):
print (Colors.BLUE + "get file info by ID: " + file_hash + Colors.ENDC)
info_url = VT_API_URL + "files/" + file_hash
res = requests.get(info_url, headers = self.headers)
if res.status_code == 200:
result = res.json()
if result.get("data").get("attributes").get("last_analysis_results"):
stats = result.get("data").get("attributes").get("last_analysis_stats")
results = result.get("data").get("attributes").get("last_analysis_results")
print (Colors.RED + "malicious: " + str(stats.get("malicious")) + Colors.ENDC)
print (Colors.YELLOW + "undetected : " + str(stats.get("undetected")) + Colors.ENDC)
print ()
for k in results:
if results[k].get("category") == "malicious":
print ("==================================================")
print (Colors.GREEN + results[k].get("engine_name") + Colors.ENDC)
print ("version : " + results[k].get("engine_version"))
print ("category : " + results[k].get("category"))
print ("result : " + Colors.RED + results[k].get("result") + Colors.ENDC)
print ("method : " + results[k].get("method"))
print ("update : " + results[k].get("engine_update"))
print ("==================================================")
print ()
print (Colors.GREEN + "successfully analyse: OK" + Colors.ENDC)
sys.exit()
else:
print (Colors.BLUE + "failed to analyse :(..." + Colors.ENDC)
else:
print (Colors.RED + "failed to get information :(" + Colors.ENDC)
print (Colors.RED + "status code: " + str(res.status_code) + Colors.ENDC)
sys.exit()
So, full source code of our tool is:
# upload PE file to VirusTotal
# then get info about the results
# of analysis, print if malicious
import os
import sys
import time
import json
import requests
import argparse
import hashlib
# for terminal colors
class Colors:
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
PURPLE = '\033[95m'
ENDC = '\033[0m'
# VirusTotal API key
VT_API_KEY = "My VirusTotal API key"
# VirusTotal API v3 URL
VT_API_URL = "https://www.virustotal.com/api/v3/"
# upload malicious file to VirusTotal and analyse
class VTScan:
def __init__(self):
self.headers = {
"x-apikey" : VT_API_KEY,
"User-Agent" : "vtscan v.1.0",
"Accept-Encoding" : "gzip, deflate",
}
def upload(self, malware_path):
print (Colors.BLUE + "upload file: " + malware_path + "..." + Colors.ENDC)
self.malware_path = malware_path
upload_url = VT_API_URL + "files"
files = {"file" : (
os.path.basename(malware_path),
open(os.path.abspath(malware_path), "rb"))
}
print (Colors.YELLOW + "upload to " + upload_url + Colors.ENDC)
res = requests.post(upload_url, headers = self.headers, files = files)
if res.status_code == 200:
result = res.json()
self.file_id = result.get("data").get("id")
print (Colors.YELLOW + self.file_id + Colors.ENDC)
print (Colors.GREEN + "successfully upload PE file: OK" + Colors.ENDC)
else:
print (Colors.RED + "failed to upload PE file :(" + Colors.ENDC)
print (Colors.RED + "status code: " + str(res.status_code) + Colors.ENDC)
sys.exit()
def analyse(self):
print (Colors.BLUE + "get info about the results of analysis..." + Colors.ENDC)
analysis_url = VT_API_URL + "analyses/" + self.file_id
res = requests.get(analysis_url, headers = self.headers)
if res.status_code == 200:
result = res.json()
status = result.get("data").get("attributes").get("status")
if status == "completed":
stats = result.get("data").get("attributes").get("stats")
results = result.get("data").get("attributes").get("results")
print (Colors.RED + "malicious: " + str(stats.get("malicious")) + Colors.ENDC)
print (Colors.YELLOW + "undetected : " + str(stats.get("undetected")) + Colors.ENDC)
print ()
for k in results:
if results[k].get("category") == "malicious":
print ("==================================================")
print (Colors.GREEN + results[k].get("engine_name") + Colors.ENDC)
print ("version : " + results[k].get("engine_version"))
print ("category : " + results[k].get("category"))
print ("result : " + Colors.RED + results[k].get("result") + Colors.ENDC)
print ("method : " + results[k].get("method"))
print ("update : " + results[k].get("engine_update"))
print ("==================================================")
print ()
print (Colors.GREEN + "successfully analyse: OK" + Colors.ENDC)
sys.exit()
elif status == "queued":
print (Colors.BLUE + "status QUEUED..." + Colors.ENDC)
with open(os.path.abspath(self.malware_path), "rb") as malware_path:
b = f.read()
hashsum = hashlib.sha256(b).hexdigest()
self.info(hashsum)
else:
print (Colors.RED + "failed to get results of analysis :(" + Colors.ENDC)
print (Colors.RED + "status code: " + str(res.status_code) + Colors.ENDC)
sys.exit()
def run(self, malware_path):
self.upload(malware_path)
self.analyse()
def info(self, file_hash):
print (Colors.BLUE + "get file info by ID: " + file_hash + Colors.ENDC)
info_url = VT_API_URL + "files/" + file_hash
res = requests.get(info_url, headers = self.headers)
if res.status_code == 200:
result = res.json()
if result.get("data").get("attributes").get("last_analysis_results"):
stats = result.get("data").get("attributes").get("last_analysis_stats")
results = result.get("data").get("attributes").get("last_analysis_results")
print (Colors.RED + "malicious: " + str(stats.get("malicious")) + Colors.ENDC)
print (Colors.YELLOW + "undetected : " + str(stats.get("undetected")) + Colors.ENDC)
print ()
for k in results:
if results[k].get("category") == "malicious":
print ("==================================================")
print (Colors.GREEN + results[k].get("engine_name") + Colors.ENDC)
print ("version : " + results[k].get("engine_version"))
print ("category : " + results[k].get("category"))
print ("result : " + Colors.RED + results[k].get("result") + Colors.ENDC)
print ("method : " + results[k].get("method"))
print ("update : " + results[k].get("engine_update"))
print ("==================================================")
print ()
print (Colors.GREEN + "successfully analyse: OK" + Colors.ENDC)
sys.exit()
else:
print (Colors.BLUE + "failed to analyse :(..." + Colors.ENDC)
else:
print (Colors.RED + "failed to get information :(" + Colors.ENDC)
print (Colors.RED + "status code: " + str(res.status_code) + Colors.ENDC)
sys.exit()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-m','--mal', required = True, help = "PE file path for scanning")
args = vars(parser.parse_args())
vtscan = VTScan()
vtscan.run(args["mal"])
I don’t consider that retelling the documentation is a good idea, since everything is well written there.
demo
Let’s go to see everything in action. It’s pretty simple. Firstly, compile malware from one of my previous posts. It’s a classic process injection example:
x86_64-w64-mingw32-g++ hack.cpp -o hack.exe -mconsole -I/usr/share/mingw-w64/include/ -s -ffunction-sections -fdata-sections -Wno-write-strings -Wint-to-pointer-cast -fno-exceptions -fmerge-all-constants -static-libstdc++ -static-libgcc -fpermissive
Then run our python script with --mal hack.exe
params:
python3 vtscan.py --mal hack.exe
As you can see, everything is work perfectly.
The VirusTotal API allows you to do more fancy and custom things, and I think my script can serve as a starting point for your projects.
VirusTotal API v3 documentation
hack.exe in VirusTotal
Classic code injection technique
source code on Github
This is a practical case for educational purposes only.
Thanks for your time happy hacking and good bye!
PS. All drawings and screenshots are mine