244 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Bash
		
	
	
	
	
	
		
		
			
		
	
	
			244 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Bash
		
	
	
	
	
	
|   | from os import system, name | ||
|  | from typing import TypedDict | ||
|  | # from core import pack | ||
|  | import json | ||
|  | import sys | ||
|  | from PyQt6.QtCore import * | ||
|  | 
 | ||
|  | 
 | ||
|  | def pack(s): | ||
|  |     try: | ||
|  |         json.loads(s) | ||
|  |     except Exception as e: | ||
|  |         return 'ERROR: ' + str(e) | ||
|  |     ba = qCompress(QByteArray(s.encode())) | ||
|  |     ba = ba.toBase64(QByteArray.Base64Option.Base64UrlEncoding | QByteArray.Base64Option.OmitTrailingEquals) | ||
|  |     return str(ba, 'utf-8') | ||
|  | 
 | ||
|  | class WireguardConfInterfaceData(TypedDict): | ||
|  |     """
 | ||
|  |     the interface parameters from wireguard .conf file | ||
|  |     """
 | ||
|  |     PrivateKey: str | ||
|  |     Address: str | ||
|  |     DNS: str | ||
|  | 
 | ||
|  | 
 | ||
|  | class WireguardConfAdditionalInterfaceData(WireguardConfInterfaceData): | ||
|  |     """
 | ||
|  |     additional parameters used by the amnezia-wg protocol | ||
|  |     """
 | ||
|  |     Jc: int | ||
|  |     Jmin: int | ||
|  |     Jmax: int | ||
|  |     S1: int | ||
|  |     S2: int | ||
|  |     H1: int | ||
|  |     H2: int | ||
|  |     H3: int | ||
|  |     H4: int | ||
|  | 
 | ||
|  | 
 | ||
|  | class WireguardConfPeerData(TypedDict): | ||
|  |     """
 | ||
|  |     the peer parameters from wireguard .conf file | ||
|  |     """
 | ||
|  |     PublicKey: str | ||
|  |     PresharedKey: str | ||
|  |     AllowedIPs: str | ||
|  |     Endpoint: str | ||
|  |     Port: str | ||
|  | 
 | ||
|  | 
 | ||
|  | class WireguardConfFullData(TypedDict): | ||
|  |     Interface: WireguardConfInterfaceData | ||
|  |     Peer: WireguardConfPeerData | ||
|  | 
 | ||
|  | 
 | ||
|  | class WireguardConfParser: | ||
|  |     """
 | ||
|  |     .conf file parser | ||
|  |     """
 | ||
|  |     def __init__(self, conf_file: str): | ||
|  |         self.CONF_FILE = conf_file | ||
|  | 
 | ||
|  |     def read_data(self) -> str: | ||
|  |         """return a string with all the data from .conf file""" | ||
|  |         with open(self.CONF_FILE, 'r') as file: | ||
|  |             lines = file.readlines() | ||
|  |         return "".join(lines) | ||
|  | 
 | ||
|  |     def pack_config_data(self) -> WireguardConfFullData: | ||
|  |         """return packed data from .conf file
 | ||
|  | 
 | ||
|  |         packed date is an instance of WireguardConfFullData | ||
|  |         """
 | ||
|  |         wireguard_data: WireguardConfFullData = {} | ||
|  |         interface_data: WireguardConfInterfaceData = {} | ||
|  |         peer_data: WireguardConfPeerData = {} | ||
|  |         data = self.read_data() | ||
|  |         parsing_mode = "" | ||
|  | 
 | ||
|  |         for line in data.split('\n'): | ||
|  |             if line == '[Interface]': | ||
|  |                 parsing_mode = 'interface' | ||
|  |                 continue | ||
|  |             elif line == '[Peer]': | ||
|  |                 parsing_mode = 'peer' | ||
|  |                 continue | ||
|  | 
 | ||
|  |             line_list = [l.strip() for l in line.split('=', 1)] | ||
|  |             if line_list == ['']: | ||
|  |                 continue | ||
|  | 
 | ||
|  |             if parsing_mode == 'interface': | ||
|  |                 interface_data[line_list[0]] = line_list[1] | ||
|  |             if parsing_mode == 'peer': | ||
|  |                 peer_data[line_list[0]] = line_list[1] | ||
|  | 
 | ||
|  |         wireguard_data['Interface'] = interface_data | ||
|  |         wireguard_data['Peer'] = peer_data | ||
|  | 
 | ||
|  |         return wireguard_data | ||
|  | 
 | ||
|  | 
 | ||
|  | def unpack_config_data(packed_config_data: WireguardConfFullData) -> str: | ||
|  |     """return string with the unpacked data from WireguardConfFullData instance""" | ||
|  |     data = ['[Interface]'] | ||
|  | 
 | ||
|  |     for interface_key in packed_config_data['Interface']: | ||
|  |         value = packed_config_data['Interface'][interface_key] | ||
|  | 
 | ||
|  |         if interface_key == 'Address': | ||
|  |             value = value.split(',')[0] | ||
|  |         if interface_key == 'DNS': | ||
|  |             value = value.split(',')[0] | ||
|  | 
 | ||
|  |         data.append(f"{interface_key} = {value}") | ||
|  | 
 | ||
|  |     data.append('\n[Peer]') | ||
|  |     for peer_key in packed_config_data['Peer']: | ||
|  |         value = packed_config_data['Peer'][peer_key] | ||
|  |         data.append(f"{peer_key} = {value}") | ||
|  | 
 | ||
|  |     return "\n".join(data) | ||
|  | 
 | ||
|  | 
 | ||
|  | def add_parameters_in_config_data(packed_config_data: WireguardConfFullData): | ||
|  |     """updates packed data which is th instance of WireguardConfFullData
 | ||
|  | 
 | ||
|  |     more specifically data is the instance of WireguardConfAdditionalInterfaceData | ||
|  |     """
 | ||
|  |     additional_parameters: WireguardConfAdditionalInterfaceData = { | ||
|  |         'Jc': '7', | ||
|  |         'Jmin': '50', | ||
|  |         'Jmax': '1000', | ||
|  |         'S1': '116', | ||
|  |         'S2': '61', | ||
|  |         'H1': '1139437039', | ||
|  |         'H2': '1088834137', | ||
|  |         'H3': '977318325', | ||
|  |         'H4': '1583407056' | ||
|  |     } | ||
|  | 
 | ||
|  |     packed_config_data['Interface'].update(additional_parameters) | ||
|  | 
 | ||
|  | 
 | ||
|  | class AmneziaWgBuilder: | ||
|  |     """
 | ||
|  |     this class helps to encode (build) data from WireguardConfFullData instance | ||
|  |     """
 | ||
|  |     def __init__(self, wireguard_config_data: WireguardConfFullData, description: str): | ||
|  |         self.WIREGUARD_CONFIG_DATA = wireguard_config_data | ||
|  |         self.DESCRIPTION = description | ||
|  | 
 | ||
|  |     def build(self): | ||
|  |         """this method encodes information""" | ||
|  |         json_data = self.generate_json() | ||
|  |         print(pack(json_data)) | ||
|  | 
 | ||
|  |     def get_string_wireguard_config_data(self): | ||
|  |         add_parameters_in_config_data(self.WIREGUARD_CONFIG_DATA.copy()) | ||
|  |         return unpack_config_data(self.WIREGUARD_CONFIG_DATA).replace('\n', '\\n') | ||
|  | 
 | ||
|  |     def get_client_ip(self) -> str: | ||
|  |         return self.WIREGUARD_CONFIG_DATA["Interface"]["Address"].split(",")[0].split('/')[0] | ||
|  | 
 | ||
|  |     def generate_json(self) -> str: | ||
|  |         client_ip = self.get_client_ip() | ||
|  |         client_priv_key = self.WIREGUARD_CONFIG_DATA['Interface']['PrivateKey'] | ||
|  |         config = self.get_string_wireguard_config_data() | ||
|  |         hostName, port = self.WIREGUARD_CONFIG_DATA['Peer']['Endpoint'].split(':') | ||
|  |         psk_key = self.WIREGUARD_CONFIG_DATA['Peer']["PresharedKey"] | ||
|  |         server_pub_key = self.WIREGUARD_CONFIG_DATA['Peer']['PublicKey'] | ||
|  |         PRIMARY_DNS, SECONDARY_DNS = '1.1.1.1', '1.0.0.1' | ||
|  |         last_config = ( | ||
|  |             '{\n' | ||
|  |             '    "H1": "32387182",\n' | ||
|  |             '    "H2": "1638522486",\n' | ||
|  |             '    "H3": "1724528624",\n' | ||
|  |             '    "H4": "172455276",\n' | ||
|  |             '    "Jc": "8",\n' | ||
|  |             '    "Jmax": "1000",\n' | ||
|  |             '    "Jmin": "50",\n' | ||
|  |             '    "S1": "26",\n' | ||
|  |             '    "S2": "74",\n' | ||
|  |             f'    "client_ip": "{client_ip}",\n' | ||
|  |             f'    "client_priv_key": "{client_priv_key}",\n' | ||
|  |             f'    "client_pub_key": "0",\n' | ||
|  |             f'    "config": "{config}",\n' | ||
|  |             f'    "hostName": "{hostName}",\n' | ||
|  |             f'    "port": {port},\n' | ||
|  |             f'    "psk_key": "{psk_key}",\n' | ||
|  |             f'    "server_pub_key": "{server_pub_key}"\n' | ||
|  |             '}\n' | ||
|  |         ) | ||
|  | 
 | ||
|  |         json_value = { | ||
|  |             "containers": [ | ||
|  |                 { | ||
|  |                     "awg": { | ||
|  |                         "H1": "32387182", | ||
|  |                         "H2": "1638522486", | ||
|  |                         "H3": "1724528624", | ||
|  |                         "H4": "172455276", | ||
|  |                         "Jc": "8", | ||
|  |                         "Jmax": "1000", | ||
|  |                         "Jmin": "50", | ||
|  |                         "S1": "26", | ||
|  |                         "S2": "74", | ||
|  |                         "last_config": f'{last_config}', | ||
|  |                         "port": f"{port}", | ||
|  |                         "transport_proto": "udp" | ||
|  |                     }, | ||
|  |                     "container": "amnezia-awg" | ||
|  |                 } | ||
|  |             ], | ||
|  |             "defaultContainer": "amnezia-awg", | ||
|  |             "description": f"{self.DESCRIPTION}", | ||
|  |             "dns1": f"{PRIMARY_DNS}", | ||
|  |             "dns2": f"{SECONDARY_DNS}", | ||
|  |             "hostName": f"{hostName}" | ||
|  |         } | ||
|  |         return json.dumps(json_value) | ||
|  | 
 | ||
|  | id = sys.argv[1] | ||
|  | usid= f'ShmBot_{id}' | ||
|  | def encode_config(path): | ||
|  | 
 | ||
|  |     file = WireguardConfParser(path) | ||
|  |     file = file.pack_config_data() | ||
|  | 
 | ||
|  |     awg = AmneziaWgBuilder(file, usid) | ||
|  |     jssn_ = awg.generate_json() | ||
|  |     vpn_ = pack(jssn_) | ||
|  |     return vpn_ | ||
|  | 
 | ||
|  | if len(sys.argv) != 2: | ||
|  |     print("Usage: python encode.py us.id") | ||
|  |     sys.exit(1) | ||
|  | 
 | ||
|  | conf_file = f'/etc/amnezia/amneziawg/keys/{id}/{id}.conf' | ||
|  | vpn_ = encode_config(conf_file) | ||
|  | print (vpn_) |