awg-manager/encode.sh
2024-04-25 00:06:55 +03:00

244 lines
7.2 KiB
Bash

from os import system, name
from typing import TypedDict
# from core import pack
import json
import sys
from PyQt6.QtCore import *
def pack(s):
try:
json.loads(s)
except Exception as e:
return 'ERROR: ' + str(e)
ba = qCompress(QByteArray(s.encode()))
ba = ba.toBase64(QByteArray.Base64Option.Base64UrlEncoding | QByteArray.Base64Option.OmitTrailingEquals)
return str(ba, 'utf-8')
class WireguardConfInterfaceData(TypedDict):
"""
the interface parameters from wireguard .conf file
"""
PrivateKey: str
Address: str
DNS: str
class WireguardConfAdditionalInterfaceData(WireguardConfInterfaceData):
"""
additional parameters used by the amnezia-wg protocol
"""
Jc: int
Jmin: int
Jmax: int
S1: int
S2: int
H1: int
H2: int
H3: int
H4: int
class WireguardConfPeerData(TypedDict):
"""
the peer parameters from wireguard .conf file
"""
PublicKey: str
PresharedKey: str
AllowedIPs: str
Endpoint: str
Port: str
class WireguardConfFullData(TypedDict):
Interface: WireguardConfInterfaceData
Peer: WireguardConfPeerData
class WireguardConfParser:
"""
.conf file parser
"""
def __init__(self, conf_file: str):
self.CONF_FILE = conf_file
def read_data(self) -> str:
"""return a string with all the data from .conf file"""
with open(self.CONF_FILE, 'r') as file:
lines = file.readlines()
return "".join(lines)
def pack_config_data(self) -> WireguardConfFullData:
"""return packed data from .conf file
packed date is an instance of WireguardConfFullData
"""
wireguard_data: WireguardConfFullData = {}
interface_data: WireguardConfInterfaceData = {}
peer_data: WireguardConfPeerData = {}
data = self.read_data()
parsing_mode = ""
for line in data.split('\n'):
if line == '[Interface]':
parsing_mode = 'interface'
continue
elif line == '[Peer]':
parsing_mode = 'peer'
continue
line_list = [l.strip() for l in line.split('=', 1)]
if line_list == ['']:
continue
if parsing_mode == 'interface':
interface_data[line_list[0]] = line_list[1]
if parsing_mode == 'peer':
peer_data[line_list[0]] = line_list[1]
wireguard_data['Interface'] = interface_data
wireguard_data['Peer'] = peer_data
return wireguard_data
def unpack_config_data(packed_config_data: WireguardConfFullData) -> str:
"""return string with the unpacked data from WireguardConfFullData instance"""
data = ['[Interface]']
for interface_key in packed_config_data['Interface']:
value = packed_config_data['Interface'][interface_key]
if interface_key == 'Address':
value = value.split(',')[0]
if interface_key == 'DNS':
value = value.split(',')[0]
data.append(f"{interface_key} = {value}")
data.append('\n[Peer]')
for peer_key in packed_config_data['Peer']:
value = packed_config_data['Peer'][peer_key]
data.append(f"{peer_key} = {value}")
return "\n".join(data)
def add_parameters_in_config_data(packed_config_data: WireguardConfFullData):
"""updates packed data which is th instance of WireguardConfFullData
more specifically data is the instance of WireguardConfAdditionalInterfaceData
"""
additional_parameters: WireguardConfAdditionalInterfaceData = {
'Jc': '7',
'Jmin': '50',
'Jmax': '1000',
'S1': '116',
'S2': '61',
'H1': '1139437039',
'H2': '1088834137',
'H3': '977318325',
'H4': '1583407056'
}
packed_config_data['Interface'].update(additional_parameters)
class AmneziaWgBuilder:
"""
this class helps to encode (build) data from WireguardConfFullData instance
"""
def __init__(self, wireguard_config_data: WireguardConfFullData, description: str):
self.WIREGUARD_CONFIG_DATA = wireguard_config_data
self.DESCRIPTION = description
def build(self):
"""this method encodes information"""
json_data = self.generate_json()
print(pack(json_data))
def get_string_wireguard_config_data(self):
add_parameters_in_config_data(self.WIREGUARD_CONFIG_DATA.copy())
return unpack_config_data(self.WIREGUARD_CONFIG_DATA).replace('\n', '\\n')
def get_client_ip(self) -> str:
return self.WIREGUARD_CONFIG_DATA["Interface"]["Address"].split(",")[0].split('/')[0]
def generate_json(self) -> str:
client_ip = self.get_client_ip()
client_priv_key = self.WIREGUARD_CONFIG_DATA['Interface']['PrivateKey']
config = self.get_string_wireguard_config_data()
hostName, port = self.WIREGUARD_CONFIG_DATA['Peer']['Endpoint'].split(':')
psk_key = self.WIREGUARD_CONFIG_DATA['Peer']["PresharedKey"]
server_pub_key = self.WIREGUARD_CONFIG_DATA['Peer']['PublicKey']
PRIMARY_DNS, SECONDARY_DNS = '1.1.1.1', '1.0.0.1'
last_config = (
'{\n'
' "H1": "32387182",\n'
' "H2": "1638522486",\n'
' "H3": "1724528624",\n'
' "H4": "172455276",\n'
' "Jc": "8",\n'
' "Jmax": "1000",\n'
' "Jmin": "50",\n'
' "S1": "26",\n'
' "S2": "74",\n'
f' "client_ip": "{client_ip}",\n'
f' "client_priv_key": "{client_priv_key}",\n'
f' "client_pub_key": "0",\n'
f' "config": "{config}",\n'
f' "hostName": "{hostName}",\n'
f' "port": {port},\n'
f' "psk_key": "{psk_key}",\n'
f' "server_pub_key": "{server_pub_key}"\n'
'}\n'
)
json_value = {
"containers": [
{
"awg": {
"H1": "32387182",
"H2": "1638522486",
"H3": "1724528624",
"H4": "172455276",
"Jc": "8",
"Jmax": "1000",
"Jmin": "50",
"S1": "26",
"S2": "74",
"last_config": f'{last_config}',
"port": f"{port}",
"transport_proto": "udp"
},
"container": "amnezia-awg"
}
],
"defaultContainer": "amnezia-awg",
"description": f"{self.DESCRIPTION}",
"dns1": f"{PRIMARY_DNS}",
"dns2": f"{SECONDARY_DNS}",
"hostName": f"{hostName}"
}
return json.dumps(json_value)
id = sys.argv[1]
usid= f'ShmBot_{id}'
def encode_config(path):
file = WireguardConfParser(path)
file = file.pack_config_data()
awg = AmneziaWgBuilder(file, usid)
jssn_ = awg.generate_json()
vpn_ = pack(jssn_)
return vpn_
if len(sys.argv) != 2:
print("Usage: python encode.py us.id")
sys.exit(1)
conf_file = f'/etc/amnezia/amneziawg/keys/{id}/{id}.conf'
vpn_ = encode_config(conf_file)
print (vpn_)