3-Step RDP Honeypot: Step 3 | Build the Bot

Chapin Bryce
Pythonic Forensics
Published in
8 min readFeb 15, 2020

--

If you are joining the series here, please consider starting with the Introduction and working through steps 1 and 2, as they cover the objectives and setting up pre-requisites for this phase of the honeypot design.

Alright, we have set up a Honeypot listening for RDP traffic and capturing full PCAPs; then we stood up Elasticsearch and Moloch for processing our PCAP data and extracting key features. With Maxmind integration, Moloch has even provided enhancement of IP address information, providing more value for us on geolocation and ASN organization details.

With this rich data source, we can start operationalizing this intelligence. This can mean different things to different people, and we are going to demonstrate one approach: a Twitter and Pastebin bot to share summary stats of the data collected over the prior 24 hours. You may find a more useful application for this information, though please use this post as inspiration!

Lastly, just because one bot (RDPSnitch) already exists does not mean more are not needed. Feel free to follow this guide and build a clone of this bot — as an industry, we can only benefit from more visibility!

Twitter API Access

First we will want to create our Twitter bot account and request API access. This process can take some time and requires an application and review from Twitter to approve. For this reason, we will start with this phase early with the below link:

Once we have been approved, we will need to collect the following details for use in our script:

  • Twitter API Key
  • Twitter Secret
  • Twitter Access Token
  • Twitter Access Secret

While our application is pending, we can start on the next steps, though will need the above four API components to complete the Twitter component of our bot.

Pastebin Setup

We can use Pastebin to share full dumps of daily information, beyond the 140 characters allowed on Twitter. Once you’ve created an account, you can access the API key on their API page here: https://pastebin.com/api. In addition to the API key we will need to use the account’s username and password to authenticate and post on behalf of the user. We will cover this more in the final script.

Our Snitch Script

With access to the Twitter and Pastebin APIs, we are ready to build out our honeypot bot script. We will walk through the code in this section, though if you are in a hurry, check out the full code on GitHub here:

This script can run on the same system as our Moloch or Elasticsearch instance, or on another host with access to the Elasticsearch index. We need to ensure the script is able to access the internet and we can set it up to execute automatically, such as through a scheduled task.

Our goal with this script is to generate the a daily summary of activity observed in the honey pot, report on the top activity, and provide a quick snapshot in 140 characters or less, with full frequency available in Pastebin.

Prep Work

First we need to clone the rdpsnitch repository with git clone https://github.com/dfir-honeypots/rdpsnitch. Once downloaded, we can set up our dependencies by running pip install -r requirements.txt — Please ensure you are using Python 3!

Next we will edit the “constants-template.py” file with our API keys and other configuration options, saving it to a file named “constants.py”:

# Please fill in each of the below with the correct values for your 
# environment and rename to constants.py.
#
# DO NOT COMMIT TO GIT OR EXPOSE PUBLICLY!! CONTAINS API KEY/SECRETS
#
ELASTICSEARCH_HOST = ''
ELASTICSEARCH_PORT = ''
PASTEBIN_API_KEY = ''
PASTEBIN_USER = ''
PASTEBIN_PASS = ''
TWITTER_API_KEY = ''
TWITTER_SECRET = ''
TWITTER_ACCESS_TOKEN = ''
TWITTER_ACCESS_SECRET = ''

We then move to the “rdp-snitch.py” script, starting with our imports and constants:

"""This script contains code to pull data from an Elasticsearch index with Moloch parsed PCAP data of RDP (3389/TCP) traffic 
and aggregate it into data that is posted to Pastebin and
shared on Twitter.
"""
from datetime import datetime
from urllib.parse import quote
import time
from tqdm import tqdm
import tweepy
import constants
import requests
from elasticsearch import Elasticsearch
__author__ = 'Chapin Bryce'
__version__ = 20200120
__desc__ = 'Utility to gather and report RDP Scanner information'
SEARCH_SIZE = 1000
SCROLL_TIMEOUT = '2m'

We then build out our class to handle the statistic generation from Elasticsearch. While a bit manual, we specify each of the user, IP, and ASN data types we want to pivot and report on.

class GatherRDPData(object):
def __init__(self, es_host, es_port):
super().__init__()
self.es = Elasticsearch(['{}:{}'.format(es_host, es_port)])
self.reports = {}
def run(self):
# Gather data
all_data = self.run_custom_agg(['user', 'srcIp', 'srcASN'])
self.users = all_data['user']
self.ips = all_data['srcIp']
self.asns = all_data['srcASN']
self.total = all_data['__total']
# Run reports
self.reports['users_txt'] = self.format_txt_report(
all_data['user'], ['count', 'user'])
self.reports['ips_txt'] = self.format_txt_report(
all_data['srcIp'], ['count', 'srcIp'])
self.reports['asns_txt'] = self.format_txt_report(
all_data['srcASN'], ['count', 'srcASN'])

Elasticsearch has functionality to aggregate data, though it has some limitations. For this reason we have implemented a slower approach for aggregating the records.

    def run_custom_agg(self, agg_fields):
res = self.es.search(
index="sessions2*",
size=SEARCH_SIZE,
scroll=SCROLL_TIMEOUT,
_source_includes=agg_fields,
body={
"query": {
"range": {
"firstPacket": {"from": "now-1d", "to": "now"}}
}
}
)
agg_data = self.agg_scroll(res, agg_fields)
sorted_data = {'__total': agg_data['__total']}
for field in agg_fields:
data_list = [{field: k, 'count': v}
for k, v in agg_data[field].items()]
sorted_data[field] = sorted(
data_list, key=lambda x: x['count'], reverse=True)
return sorted_data def agg_scroll(self, data, agg_fields):
all_data = {f: {} for f in agg_fields}
# Get the scroll ID
sid = data['_scroll_id']
scroll_size = len(data.get('hits', {}).get('hits', []))
total_docs = data.get('hits', {}).get(
'total', {}).get('value', 0)
all_data['__total'] = total_docs
pbar = tqdm(desc=f"Aggregating",
total=total_docs,
unit=' docs',
unit_scale=True)
while scroll_size > 0:
"Scrolling..."
# Before scroll, process current batch of hits
agg_data = data.get('hits', {}).get('hits', [])
if not agg_data:
break
# Count records
for item in agg_data:
field_data = item['_source']
if not len(field_data):
continue
for agg_field in agg_fields:
if agg_field not in field_data:
continue
if isinstance(field_data.get(agg_field), list):
for term in field_data.get(agg_field, []):
if term == '':
term = '_no_value_'
if term not in all_data[agg_field]:
all_data[agg_field][term] = 0
all_data[agg_field][term] += 1
else:
term = field_data.get(agg_field, '')
if term == '':
term = '_no_value_'
if term not in all_data[agg_field]:
all_data[agg_field][term] = 0
all_data[agg_field][term] += 1
pbar.update(len(agg_data))
data = self.es.scroll(scroll_id=sid,
scroll=SCROLL_TIMEOUT)
# Update the scroll ID
sid = data['_scroll_id']
# Get the number of results that returned
# in the last scroll
scroll_size = len(agg_data)
pbar.close()
return all_data

Now that our aggregation functions are in place, we can move to the reporting phase. We have built a simple function to format our text-based report for ease of displaying in Pastebin. We could build similar functions to report to JSON or other formats as desired.

def format_txt_report(self, dataset, header):
report = f"{header[0]} {header[1]}\n"
for item in dataset:
report += "{} {}\n".format(item[header[0]],
item[header[1]])
return report

Thats all that goes in to our GatherRDPData() summarizing class. We will next define our Pastebin posting function, as the API has a multi-step process for posting data to an account programmatically:

def post_pastebin(data, title, data_fmt):
sess_res = requests.post(
'https://pastebin.com/api/api_login.php',
{
'api_user_password': constants.PASTEBIN_PASS,
'api_user_name': constants.PASTEBIN_USER,
'api_dev_key': constants.PASTEBIN_API_KEY
}
)
if 'Bad' in sess_res.text:
print(sess_res.text)
return sess_res
url = 'https://pastebin.com/api/api_post.php' res = requests.post(url, {
# Required
'api_dev_key': constants.PASTEBIN_API_KEY,
'api_user_key': sess_res.text,
'api_paste_code': data,
'api_option': 'paste',
# Optional
'api_paste_format': data_fmt,
'api_paste_name': quote(title),
'api_paste_private': 0 # Public
})
if 'Bad' in res.text:
print("Error posting to pastebin")
print(res.text)
else:
print(res.text)
return res

With these components put together, we can start calling them. First we set up the GatherRDPData()class, passing along the Elasticsearch configuration details. We then generate the aggregation and call our post_pastebin() functions to generate the information to create a new Pastebin paste. We capture the returned value so we can reference the direct URL for the post. The sleep interval is to ensure that we don’t publish to Pastebin too quickly.

if __name__ == "__main__":

# Gather data
gather = GatherRDPData(
constants.ELASTICSEARCH_HOST, constants.ELASTICSEARCH_PORT)
gather.run()
# Write to Pastebin
now = datetime.now().strftime('%Y-%m-%d')
title_fmt = "{}_".format(now)
pastebin_sleep = 60
users_txt_res = post_pastebin(
gather.reports['users_txt'], title_fmt+'users.txt', 'text')
time.sleep(pastebin_sleep)
ips_txt_res = post_pastebin(
gather.reports['ips_txt'], title_fmt+'ips.txt', 'text')
time.sleep(pastebin_sleep)
asns_txt_res = post_pastebin(
gather.reports['asns_txt'], title_fmt+'asns.txt', 'text')

We now can generate our tweets. We will plan on two tweets: one that shares a summary of the top 3 hits from each data category, and a second that shows where to get full data on Pastebin. We will start by setting up our first tweet:

    summary = f"{datetime.now().strftime('%Y-%m-%d')} RDP #Honeypot IOCs "
summary += f"- {gather.total:,} scans\n\nTop IPs:\n{top_ips}\n\n"
summary += f"Top Users:\n{top_users}\n\nTop ASNs:\n{top_asns}"
if len(summary) > 230:
summary = summary[:230]+"..." # Trim ASNs if needed
summary += "\n\nLinks below with details. #DFIR #InfoSec"

We then can defined our second tweet with Pastebin links and a few more hashtags for increased visibility:

    pastebin_summary = "Pastebin links with full 24-hr RDP "
pastebin_summary += f"#Honeypot IOC Lists:\nUsers: {users_txt_res.text}\n"
pastebin_summary += f"IPs: {ips_txt_res.text}\nASNs: {asns_txt_res.text}"
if len(pastebin_summary) <= 190:
pastebin_summary += "\n\n#DFIR #InfoSec #CyberSec #SOC #Hunt "
pastebin_summary += "#Blueteam #SecurityOperations #SecOps #Security"
elif len(summary) > 280:
pastebin_summary = pastebin_summary[:270]+"..."

And the easiest part, we now post to Twitter:

    auth = tweepy.OAuthHandler(
constants.TWITTER_API_KEY, constants.TWITTER_SECRET)
auth.set_access_token(
constants.TWITTER_ACCESS_TOKEN,
constants.TWITTER_ACCESS_SECRET)
tw_api = tweepy.API(auth) resp1 = tw_api.update_status(summary)
resp2 = tw_api.update_status(
pastebin_summary, in_reply_to_status_id=resp1.id)

By providing the resp1.id value, we can ensure the second tweet appears as a comment on the first tweet.

And voilà — we have a functioning Twitter bot that should share a Tweet and Pastebin posts like the below:

RDPSnitch Tweets generated by our script

Feel free to customize these messages as you see fit, though remember the 140 character limits!

Concluding Thoughts

In this mini-series, we have setup our honeypot, extracted valuable features from our PCAP data, and now have operationalized this intel to share with the community. Thanks to the hard work of many in our field, this process is approachable and possible to implement within an evening or weekend.

The way we can repay those who have built these great tools it to put them to use in improving the industry.

Please think about methods for operationalizing this data to assist the community or improvements to this workflow to provide greater value. This can be in the form of GitHub issues on the RDPSnitch project with bug reports or merge requests, writing your own series on implementing this — or a similar — honeypot solution, or maybe generating additional statistics to highlight useful intelligence gathered from the honeypot.

--

--

DFIR professional, skier, aviation nerd. Co-author of Learning Python for Forensics & Python Forensics Cookbook. Message me for free links to any of my articles