Create encode.sh
This commit is contained in:
parent
e7d32678ac
commit
22f6226459
243
encode.sh
Normal file
243
encode.sh
Normal file
@ -0,0 +1,243 @@
|
||||
from os import system, name
|
||||
from typing import TypedDict
|
||||
# from core import pack
|
||||
import json
|
||||
import sys
|
||||
from PyQt6.QtCore import *
|
||||
|
||||
|
||||
def pack(s):
|
||||
try:
|
||||
json.loads(s)
|
||||
except Exception as e:
|
||||
return 'ERROR: ' + str(e)
|
||||
ba = qCompress(QByteArray(s.encode()))
|
||||
ba = ba.toBase64(QByteArray.Base64Option.Base64UrlEncoding | QByteArray.Base64Option.OmitTrailingEquals)
|
||||
return str(ba, 'utf-8')
|
||||
|
||||
class WireguardConfInterfaceData(TypedDict):
|
||||
"""
|
||||
the interface parameters from wireguard .conf file
|
||||
"""
|
||||
PrivateKey: str
|
||||
Address: str
|
||||
DNS: str
|
||||
|
||||
|
||||
class WireguardConfAdditionalInterfaceData(WireguardConfInterfaceData):
|
||||
"""
|
||||
additional parameters used by the amnezia-wg protocol
|
||||
"""
|
||||
Jc: int
|
||||
Jmin: int
|
||||
Jmax: int
|
||||
S1: int
|
||||
S2: int
|
||||
H1: int
|
||||
H2: int
|
||||
H3: int
|
||||
H4: int
|
||||
|
||||
|
||||
class WireguardConfPeerData(TypedDict):
|
||||
"""
|
||||
the peer parameters from wireguard .conf file
|
||||
"""
|
||||
PublicKey: str
|
||||
PresharedKey: str
|
||||
AllowedIPs: str
|
||||
Endpoint: str
|
||||
Port: str
|
||||
|
||||
|
||||
class WireguardConfFullData(TypedDict):
|
||||
Interface: WireguardConfInterfaceData
|
||||
Peer: WireguardConfPeerData
|
||||
|
||||
|
||||
class WireguardConfParser:
|
||||
"""
|
||||
.conf file parser
|
||||
"""
|
||||
def __init__(self, conf_file: str):
|
||||
self.CONF_FILE = conf_file
|
||||
|
||||
def read_data(self) -> str:
|
||||
"""return a string with all the data from .conf file"""
|
||||
with open(self.CONF_FILE, 'r') as file:
|
||||
lines = file.readlines()
|
||||
return "".join(lines)
|
||||
|
||||
def pack_config_data(self) -> WireguardConfFullData:
|
||||
"""return packed data from .conf file
|
||||
|
||||
packed date is an instance of WireguardConfFullData
|
||||
"""
|
||||
wireguard_data: WireguardConfFullData = {}
|
||||
interface_data: WireguardConfInterfaceData = {}
|
||||
peer_data: WireguardConfPeerData = {}
|
||||
data = self.read_data()
|
||||
parsing_mode = ""
|
||||
|
||||
for line in data.split('\n'):
|
||||
if line == '[Interface]':
|
||||
parsing_mode = 'interface'
|
||||
continue
|
||||
elif line == '[Peer]':
|
||||
parsing_mode = 'peer'
|
||||
continue
|
||||
|
||||
line_list = [l.strip() for l in line.split('=', 1)]
|
||||
if line_list == ['']:
|
||||
continue
|
||||
|
||||
if parsing_mode == 'interface':
|
||||
interface_data[line_list[0]] = line_list[1]
|
||||
if parsing_mode == 'peer':
|
||||
peer_data[line_list[0]] = line_list[1]
|
||||
|
||||
wireguard_data['Interface'] = interface_data
|
||||
wireguard_data['Peer'] = peer_data
|
||||
|
||||
return wireguard_data
|
||||
|
||||
|
||||
def unpack_config_data(packed_config_data: WireguardConfFullData) -> str:
|
||||
"""return string with the unpacked data from WireguardConfFullData instance"""
|
||||
data = ['[Interface]']
|
||||
|
||||
for interface_key in packed_config_data['Interface']:
|
||||
value = packed_config_data['Interface'][interface_key]
|
||||
|
||||
if interface_key == 'Address':
|
||||
value = value.split(',')[0]
|
||||
if interface_key == 'DNS':
|
||||
value = value.split(',')[0]
|
||||
|
||||
data.append(f"{interface_key} = {value}")
|
||||
|
||||
data.append('\n[Peer]')
|
||||
for peer_key in packed_config_data['Peer']:
|
||||
value = packed_config_data['Peer'][peer_key]
|
||||
data.append(f"{peer_key} = {value}")
|
||||
|
||||
return "\n".join(data)
|
||||
|
||||
|
||||
def add_parameters_in_config_data(packed_config_data: WireguardConfFullData):
|
||||
"""updates packed data which is th instance of WireguardConfFullData
|
||||
|
||||
more specifically data is the instance of WireguardConfAdditionalInterfaceData
|
||||
"""
|
||||
additional_parameters: WireguardConfAdditionalInterfaceData = {
|
||||
'Jc': '7',
|
||||
'Jmin': '50',
|
||||
'Jmax': '1000',
|
||||
'S1': '116',
|
||||
'S2': '61',
|
||||
'H1': '1139437039',
|
||||
'H2': '1088834137',
|
||||
'H3': '977318325',
|
||||
'H4': '1583407056'
|
||||
}
|
||||
|
||||
packed_config_data['Interface'].update(additional_parameters)
|
||||
|
||||
|
||||
class AmneziaWgBuilder:
|
||||
"""
|
||||
this class helps to encode (build) data from WireguardConfFullData instance
|
||||
"""
|
||||
def __init__(self, wireguard_config_data: WireguardConfFullData, description: str):
|
||||
self.WIREGUARD_CONFIG_DATA = wireguard_config_data
|
||||
self.DESCRIPTION = description
|
||||
|
||||
def build(self):
|
||||
"""this method encodes information"""
|
||||
json_data = self.generate_json()
|
||||
print(pack(json_data))
|
||||
|
||||
def get_string_wireguard_config_data(self):
|
||||
add_parameters_in_config_data(self.WIREGUARD_CONFIG_DATA.copy())
|
||||
return unpack_config_data(self.WIREGUARD_CONFIG_DATA).replace('\n', '\\n')
|
||||
|
||||
def get_client_ip(self) -> str:
|
||||
return self.WIREGUARD_CONFIG_DATA["Interface"]["Address"].split(",")[0].split('/')[0]
|
||||
|
||||
def generate_json(self) -> str:
|
||||
client_ip = self.get_client_ip()
|
||||
client_priv_key = self.WIREGUARD_CONFIG_DATA['Interface']['PrivateKey']
|
||||
config = self.get_string_wireguard_config_data()
|
||||
hostName, port = self.WIREGUARD_CONFIG_DATA['Peer']['Endpoint'].split(':')
|
||||
psk_key = self.WIREGUARD_CONFIG_DATA['Peer']["PresharedKey"]
|
||||
server_pub_key = self.WIREGUARD_CONFIG_DATA['Peer']['PublicKey']
|
||||
PRIMARY_DNS, SECONDARY_DNS = '1.1.1.1', '1.0.0.1'
|
||||
last_config = (
|
||||
'{\n'
|
||||
' "H1": "32387182",\n'
|
||||
' "H2": "1638522486",\n'
|
||||
' "H3": "1724528624",\n'
|
||||
' "H4": "172455276",\n'
|
||||
' "Jc": "8",\n'
|
||||
' "Jmax": "1000",\n'
|
||||
' "Jmin": "50",\n'
|
||||
' "S1": "26",\n'
|
||||
' "S2": "74",\n'
|
||||
f' "client_ip": "{client_ip}",\n'
|
||||
f' "client_priv_key": "{client_priv_key}",\n'
|
||||
f' "client_pub_key": "0",\n'
|
||||
f' "config": "{config}",\n'
|
||||
f' "hostName": "{hostName}",\n'
|
||||
f' "port": {port},\n'
|
||||
f' "psk_key": "{psk_key}",\n'
|
||||
f' "server_pub_key": "{server_pub_key}"\n'
|
||||
'}\n'
|
||||
)
|
||||
|
||||
json_value = {
|
||||
"containers": [
|
||||
{
|
||||
"awg": {
|
||||
"H1": "32387182",
|
||||
"H2": "1638522486",
|
||||
"H3": "1724528624",
|
||||
"H4": "172455276",
|
||||
"Jc": "8",
|
||||
"Jmax": "1000",
|
||||
"Jmin": "50",
|
||||
"S1": "26",
|
||||
"S2": "74",
|
||||
"last_config": f'{last_config}',
|
||||
"port": f"{port}",
|
||||
"transport_proto": "udp"
|
||||
},
|
||||
"container": "amnezia-awg"
|
||||
}
|
||||
],
|
||||
"defaultContainer": "amnezia-awg",
|
||||
"description": f"{self.DESCRIPTION}",
|
||||
"dns1": f"{PRIMARY_DNS}",
|
||||
"dns2": f"{SECONDARY_DNS}",
|
||||
"hostName": f"{hostName}"
|
||||
}
|
||||
return json.dumps(json_value)
|
||||
|
||||
id = sys.argv[1]
|
||||
usid= f'ShmBot_{id}'
|
||||
def encode_config(path):
|
||||
|
||||
file = WireguardConfParser(path)
|
||||
file = file.pack_config_data()
|
||||
|
||||
awg = AmneziaWgBuilder(file, usid)
|
||||
jssn_ = awg.generate_json()
|
||||
vpn_ = pack(jssn_)
|
||||
return vpn_
|
||||
|
||||
if len(sys.argv) != 2:
|
||||
print("Usage: python encode.py us.id")
|
||||
sys.exit(1)
|
||||
|
||||
conf_file = f'/etc/amnezia/amneziawg/keys/{id}/{id}.conf'
|
||||
vpn_ = encode_config(conf_file)
|
||||
print (vpn_)
|
||||
Loading…
Reference in New Issue
Block a user