-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
155 lines (124 loc) · 5.62 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import discord
import pangea.exceptions as pe
from pangea.config import PangeaConfig
from pangea.services import Redact
from pangea.services import UrlIntel
from pangea.services import DomainIntel
import re
import json
# Set Pangea authentication token and domain
token = "your_pangea_token"
domain = "aws.us.pangea.cloud"
config = PangeaConfig(domain=domain)
def find_domain(text):
domain_regex = r"(?:[a-z0-9](?:[a-z0-9\-]{0,61}[a-z0-9])?\.)+[a-z]{2,6}"
match = re.search(domain_regex, text)
if match:
domain = match.group(0)
return domain
else:
return None
# Redaction function using Pangea Cloud
def go_redact(text):
redact = Redact(token, config=config)
print(f"Redacting PII from: {text}")
try:
redact_response = redact.redact(text=text, rulesets=["SECRETS"])
print(f"Redacted text: {redact_response.result.redacted_text}")
# Check if the original text was redacted
if text == redact_response.result.redacted_text:
return ""
return redact_response.result.redacted_text
except pe.PangeaAPIException as e:
print(f"Embargo Request Error: {e.response.summary}")
for err in e.errors:
print(f"\t{err.detail} \n")
return False
# URL intelligence function using Pangea Cloud
def go_url_intel(url):
intel = UrlIntel(token, config=config)
try:
response = intel.reputation(
url=url,
provider="crowdstrike",
verbose=True,
raw=True,
)
print(f"Response: {response.result}")
return response.result.data.verdict
except pe.PangeaAPIException as e:
print(f"Request Error: {e.response.summary}")
for err in e.errors:
print(f"\t{err.detail} \n")
return err.detail
def go_whois(domain):
intel = DomainIntel(token, config=config)
print("Checking domain...")
try:
response = intel.who_is(domain=domain, provider="whoisxml", verbose=True, raw=True)
print(f"Response: {response.result.data}")
domain_name = response.result.data.domain_name
domain_availability = response.result.data.domain_availability
created_date = response.result.data.created_date
registrar_name = response.result.data.registrar_name
registrant_organization = response.result.data.registrant_organization
return [domain_name, domain_availability, created_date, registrar_name, registrant_organization]
except pe.PangeaAPIException as e:
print(f"Request Error: {e.response.summary}")
for err in e.errors:
print(f"\t{err.detail} \n")
return None
# Discord client class
class MyClient(discord.Client):
async def on_ready(self):
# Event handler when the bot is ready and connected
print(f'Logged on as {self.user}!')
async def on_message(self, message):
# Event handler for incoming messages
# Print the details of the incoming message
print(f'Message from {message.author}: {message.content}')
# Ignore messages from the bot itself
if message.author == self.user:
return
# Redact sensitive information in the message content
redacted_msg = go_redact(message.content)
# Check if redaction resulted in any changes
if redacted_msg != "":
# Delete the original message
await message.delete()
# Send a notification about the detected API key
await message.channel.send("API Key found!")
# Send the redacted message
await message.channel.send(redacted_msg)
# Search for URLs in the message content
match = re.search(r'(http|https)://(?P<hostname>[a-zA-Z0-9-]{1,63}(?:\.[a-zA-Z0-9-]{1,63})*)[^\s]+', message.content)
if match:
# Extract the detected URL
detected_url = match.group()
print(f"Detected URL: {detected_url}")
# Check the intelligence of the URL
url_verdict = go_url_intel(detected_url)
if url_verdict != "":
# If the URL is deemed malicious, send a warning to the channel
if url_verdict == "malicious":
await message.channel.send("Malicious website found!", reference=message)
if message.content.startswith('!whois'):
input_domain = find_domain(message.content)
if input_domain is not None:
whois_response = go_whois(input_domain)
if whois_response is not None:
domain_name, domain_availability, created_date, registrar_name, registrant_organization = whois_response
embed = discord.Embed(title="Whois", description="This is an important announcement for all members.", color=discord.Color.blue())
embed.add_field(name="Domain Name", value=domain_name)
embed.add_field(name="Domain Availability", value=domain_availability)
embed.add_field(name="Created Date", value=created_date)
embed.add_field(name="Registrar Name", value=registrar_name)
embed.add_field(name="Registrant Organization", value=registrant_organization)
await message.reply(embed=embed, mention_author=True)
# Set up Discord intents
intents = discord.Intents.default()
intents.message_content = True
# Create an instance of the Discord client
client = MyClient(intents=intents)
# Run the bot with the provided token
client.run('your_bot_token')