digbot/digbot.py

113 lines
4.3 KiB
Python

# Copyright (C) 2022, 2023 Noisytoot
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import asyncio
import configparser
import socket
from irctokens import build, Line
from ircrobots import Bot as BaseBot
from ircrobots import Server as BaseServer
from ircrobots import ConnectionParams
from ircrobots.security import TLSNoVerify, TLSVerifyChain
import dns.resolver
config = configparser.ConfigParser()
config.read('digbot.conf')
CHANNELS = config['bot']['channels'].split(',')
SERVER_NAME = config['server'].get('name', 'irc')
NICK = config['server']['nick']
HOST = config['server']['host']
PORT = int(config['server'].get('port', 6667))
TLS = config.getboolean('server', 'tls', fallback=True)
TLS_VERIFY = config.getboolean('server', 'tls-verify', fallback=True)
PREFIX = config['bot']['prefix']
MODE = config['bot'].get('mode', None)
OPER_USERNAME = config['bot'].get('oper-username', None)
OPER_PASSWORD = config['bot'].get('oper-password', None)
TIMEOUT = int(config['bot'].get('timeout', 5))
def handle_command(args):
command = args[0][len(PREFIX):]
args = args[1:]
if command == "ping":
return 'pong'
elif len(args) >= 3 and command == "resolve":
domain = args[0]
rrtype = args[1]
try:
server = socket.getaddrinfo(args[2], 53, proto=socket.IPPROTO_UDP)[0][4][0]
except socket.gaierror:
return f'Error: getaddrinfo failed to find {args[2]}'
try:
query = dns.message.make_query(domain, rrtype)
answer = dns.query.udp(query, server, timeout=TIMEOUT)
response = ''
if len(answer.answer) == 0:
response += 'Empty answer section\n'
else:
response += 'Answer section:\n' + answer.answer[0].to_text() + '\n'
if len(answer.authority) == 0:
response += 'Empty authority section\n'
else:
response += 'Authority section:\n' + answer.authority[0].to_text() + '\n'
return response
except dns.rdatatype.UnknownRdatatype:
return 'Error: Unknown RR type'
except dns.exception.Timeout:
return f'Error: Query timed out after {TIMEOUT} seconds'
except ValueError:
return 'Error: ValueError'
class Server(BaseServer):
async def line_read(self, line: Line):
if line.command == "001":
if OPER_USERNAME and OPER_PASSWORD:
await self.send(build("OPER", [OPER_USERNAME, OPER_PASSWORD]))
if MODE:
await self.send(build("MODE", [NICK, MODE]))
for channel in CHANNELS:
await self.send(build("JOIN", [channel]))
elif (line.command == "PRIVMSG"
and line.params[0] in CHANNELS
and line.params[1] is not None
and line.params[1].startswith(PREFIX)):
try:
response = handle_command(line.params[1].split())
except:
response = 'Error: Unknown error in command handler'
if response is not None:
if isinstance(response, str):
for l in response.split('\n'):
await self.send(build("PRIVMSG", [line.params[0], l]))
else:
await self.send(build("PRIVMSG", [line.params[0], str(response)]))
class Bot(BaseBot):
def create_server(self, name: str):
return Server(self, name)
async def main():
tls_param = (TLSVerifyChain if TLS_VERIFY else TLSNoVerify) if TLS else None
bot = Bot()
params = ConnectionParams(NICK, HOST, PORT, tls_param)
await bot.add_server(SERVER_NAME, params)
await bot.run()
if __name__ == "__main__":
asyncio.run(main())