"""
LND related functionality
"""
import pathlib
from subprocess import call, run
from os import path
from json import dumps
import base64
from requests import get, post
import noma.config as cfg
[docs]def check_wallet():
"""
This will either import an existing seed (or our own generated one),
or use LND to create one.
It will also create a password either randomly or use an existing password
provided)
:return str: Status
"""
if cfg.LND_PATH.exists():
if not cfg.WALLET_PATH.exists():
create_wallet()
else:
print("❌ Error: LND not initialized")
print("Wallet already exists!")
print("Please backup and move: " + str(cfg.WALLET_PATH))
print("and then restart lnd")
else:
print("❌ Error: lnd directory does not exist!")
[docs]def encodemacaroons(macaroonfile=cfg.MACAROON_PATH, tlsfile=cfg.TLS_CERT_PATH):
"""base64url encode macaroon and TLS certificate"""
if path.exists(str(macaroonfile)) and path.exists(str(tlsfile)):
with open(path.expanduser(macaroonfile), "rb") as f:
macaroon_bytes = f.read()
with open(path.expanduser(tlsfile), "rb") as f:
tls_bytes = f.read()
macaroonencoded = base64.urlsafe_b64encode(macaroon_bytes)
tlsdecoded = tls_bytes.decode("utf-8")
tlstrim = (
tlsdecoded.replace("\n", "")
.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "")
.replace("+", "-")
.replace("/", "_")
.replace("=", "")
)
tlsencoded = tlstrim.encode("utf-8")
return {
"status": "OK",
"certificate": tlsencoded,
"macaroon": macaroonencoded,
}
else:
return {"status": "File Not Found"}
[docs]def connectstring(
hostname=cfg.URL_GRPC,
macaroonfile=cfg.MACAROON_PATH,
tlsfile=cfg.TLS_CERT_PATH,
):
"""Show lndconnect string for remote wallets such as Zap"""
result = encodemacaroons(macaroonfile, tlsfile)
if result["status"] == "OK":
macaroon_string = str(result["macaroon"], "utf-8")
cert_string = str(result["certificate"], "utf-8")
print(
"lndconnect://"
+ hostname
+ "?cert="
+ cert_string
+ "&macaroon="
+ macaroon_string
)
else:
print(result["status"])
[docs]def autounlock():
"""Auto-unlock lnd using password.txt, tls.cert"""
password_str = open(cfg.PASSWORD_FILE_PATH, "r").read().rstrip()
password_bytes = str(password_str).encode("utf-8")
data = {"wallet_password": base64.b64encode(password_bytes).decode()}
try:
response = post(
cfg.URL_UNLOCKWALLET, verify=cfg.TLS_CERT_PATH, data=dumps(data)
)
except Exception:
# Silence connection errors when lnd is not running
pass
else:
try:
print(response.json())
except Exception:
# JSON will fail to decode when unlocked already since response is
# empty
pass
[docs]def get_kv(key, section="", config_path=""):
"""
Parse key-value config files and print out values
:param key: left part of key value pair
:param config_path: path to config file
:param section: [section] of the kv pair
:return: value of key
"""
from configparser import ConfigParser
if not config_path:
config_path = cfg.LND_CONF
if not section:
section = "Application Options"
parser = ConfigParser(strict=False)
with open(config_path) as lines:
parser.read_file(lines)
return parser.get(section, key)
[docs]def set_kv(key, value, section="", config_path=""):
"""
Parse key-value config files and write them out with a key-value change
Note: comments are lost!
:param key: left part of key value pair
:param value: right part of key value pair
:param section: optional name of section to set in
:param config_path: path to file
:return:
"""
from configparser import ConfigParser
if not section:
section = "Application Options"
if not config_path:
config_path = cfg.LND_CONF
parser = ConfigParser(strict=False)
with open(config_path) as lines:
parser.read_file(lines)
parser.set(section, key, value)
with open(config_path, "w") as file:
parser.write(file, space_around_delimiters=False)
file.close()
[docs]def setup_tor(version=""):
"""Add tor hidden service to lnd"""
if not version:
version = "v3"
hostname_path = "/var/lib/tor/lnd-{}/hostname".format(version)
try:
print("Adding externalip directive to lnd for tor")
with open(hostname_path, "r") as hostname:
set_kv("externalip", hostname.read(), "Application Options")
except Exception as error:
print(error.__class__.__name__, ":", error)
[docs]def set_bitcoind(password, user="", lnd_config=""):
"""Add bitcoind rpc username and password to lnd"""
if not user:
user = "lncm"
if not lnd_config:
lnd_config = cfg.LND_CONF
if pathlib.Path(lnd_config).is_file():
set_kv("bitcoind.rpcuser", user, "Bitcoind", lnd_config)
set_kv("bitcoind.rpcpass", password, "Bitcoind", lnd_config)
[docs]def autoconnect(list_path=""):
"""Auto-connect to a list of nodes in lnd/autoconnect.txt"""
print("Connecting to:")
if not list_path:
list_path = pathlib.Path(cfg.LND_PATH / "autoconnect.txt")
with open(list_path) as address_list:
for address in address_list:
print(address.strip())
call(
[
"docker",
"exec",
cfg.LND_MODE + "_lnd_1",
"lncli",
"connect",
address.strip(),
]
)
[docs]def check():
"""Check lnd filesystem structure"""
if cfg.LND_PATH.is_dir():
print("✅ lnd directory exists")
else:
print("❌ lnd directory missing")
if cfg.LND_CONF.is_file():
print("✅ lnd.conf exists")
else:
print("❌ lnd.conf missing")
if cfg.LND_PATH.is_dir() and cfg.LND_CONF.is_file():
return True
return False
[docs]def backup():
"""Export and backup latest channel.db from lnd via ssh"""
# secure remote backups via scp
if cfg.CHANNEL_BACKUP.is_file():
# scp options:
# -B for non-interactive batch mode
# -p to preserve modification & access time, modes
complete = run(["scp",
"-B",
"-i {}".format(cfg.SSH_IDENTITY),
"-p",
"-P {}".format(cfg.SSH_PORT),
"{}".format(cfg.CHANNEL_BACKUP),
"{}".format(cfg.SSH_TARGET)])
return complete.returncode
print("Error: channel.backup not found")
return exit(1)
[docs]def savepeers():
"""Save list of peers to file on disk for reconnecting"""
# TODO: export list of peers to text file on disk
print("Not implemented yet")
[docs]def randompass(string_length=10):
"""Generate random password"""
from random import choice
from string import ascii_letters
letters = ascii_letters
return "".join(choice(letters) for i in range(string_length))
def _write_password(password_str):
"""Write a generated password to file, either the TEMP_PASSWORD_FILE_PATH
or the PASSWORD_FILE_PATH depending on whether SAVE_PASSWORD_CONTROL_FILE
exists."""
if not path.exists(cfg.SAVE_PASSWORD_CONTROL_FILE):
# Use temporary file if there is a password control file there
temp_password_file = open(cfg.PASSWORD_FILE_PATH, "w")
temp_password_file.write(password_str)
temp_password_file.close()
else:
# Use password.txt if password_control_file exists
password_file = open(cfg.PASSWORD_FILE_PATH, "w")
password_file.write(password_str)
password_file.close()
def _wallet_password():
"""Either load the wallet password from PASSWORD_FILE_PATH, or generate a new
password, save it to file, and in either case return the password"""
# Check if there is an existing file, if not generate a random password
if not path.exists(cfg.PASSWORD_FILE_PATH):
# password file doesnt exist
password_str = randompass(string_length=15)
_write_password(password_str)
else:
# Get password from file if password file already exists
password_str = open(cfg.PASSWORD_FILE_PATH, "r").read().rstrip()
return password_str
def _generate_and_save_seed():
"""Generate a wallet seed, save it to SEED_FILENAME, and return it"""
mnemonic = None
return_data = get(cfg.URL_GENSEED, verify=cfg.TLS_CERT_PATH)
if return_data.status_code == 200:
json_seed_creation = return_data.json()
mnemonic = json_seed_creation["cipher_seed_mnemonic"]
seed_file = open(cfg.SEED_FILENAME, "w")
for word in mnemonic:
seed_file.write(word + "\n")
seed_file.close()
# Data doesnt get set if cant create the seed but that is fine, handle
# it later
return mnemonic
def _load_seed():
"""Load the wallet seed from SEED_FILENAME and return it"""
# Seed exists
seed_file = open(cfg.SEED_FILENAME, "r")
seed_file_words = seed_file.readlines()
mnemonic = []
for importword in seed_file_words:
mnemonic.append(importword.replace("\n", ""))
return mnemonic
def _wallet_data(password_str):
"""Build and return the wallet `data` dict with the mnemonic and wallet
password"""
# Convert password to byte encoded
password_bytes = str(password_str).encode("utf-8")
# Send request to generate seed if seed file doesnt exist
if not path.exists(cfg.SEED_FILENAME):
mnemonic = _generate_and_save_seed()
else:
mnemonic = _load_seed()
if mnemonic:
# Generate init wallet file from what was posted
return {
"cipher_seed_mnemonic": mnemonic,
"wallet_password": base64.b64encode(password_bytes).decode(),
}
return {}
[docs]def create_wallet():
"""
1. Check if there's already a wallet. If there is, then exit.
2. Check for password.txt
3. If doesn't exist then check for whether we should save the password
(SAVE_PASSWORD_CONTROL_FILE exists) or not
4. If password.txt exists import password in.
5. If password.txt doesn't exist and we don't save the password, create a
password and save it in temporary path as defined in PASSWORD_FILE_PATH
6. Now start the wallet creation. Look for a seed defined in SEED_FILENAME,
if not existing then generate a wallet based on the seed by LND.
"""
password_str = _wallet_password()
# Step 1 get seed from web or file
data = _wallet_data(password_str)
# Step 2: Create wallet
if data:
# Data is defined so proceed
return_data = post(
cfg.URL_INITWALLET, verify=cfg.TLS_CERT_PATH, data=dumps(data)
)
if return_data.status_code == 200:
print("✅ Create wallet is successful")
else:
print("❌ Create wallet is not successful")
else:
print("❌ Error: cannot proceed, wallet data is not defined")
if __name__ == "__main__":
print("This file is not meant to be run directly")