diff --git a/CHANGELOG.md b/CHANGELOG.md index c9f15e61f..025bfaa32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ - Users of the VSS storage backend must upgrade their VSS server to at least version `v0.1.0-alpha.0` before upgrading LDK Node. +## Serialization Compatibility +- The `counterparty_node_id` field of the `ChannelReady` and `ChannelClosed` events is now + required. Events persisted by LDK Node v0.1.0 and prior that are missing this field will + fail to deserialize. + # 0.7.0 - Dec. 3, 2025 This seventh minor release introduces numerous new features, bug fixes, and API improvements. In particular, it adds support for channel Splicing, Async Payments, as well as sourcing chain data from a Bitcoin Core REST backend. diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 851583c5a..a6f341847 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -138,6 +138,7 @@ interface Node { sequence list_payments(); sequence list_peers(); sequence list_channels(); + sequence list_closed_channels(); NetworkGraph network_graph(); string sign_message([ByRef]sequence msg); boolean verify_signature([ByRef]sequence msg, [ByRef]string sig, [ByRef]PublicKey pkey); @@ -321,6 +322,8 @@ dictionary OutPoint { typedef dictionary ChannelDetails; +typedef dictionary ClosedChannelDetails; + typedef dictionary PeerDetails; [Remote] diff --git a/src/builder.rs b/src/builder.rs index 3df594b7c..fdf1ef8fd 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -64,7 +64,11 @@ use crate::io::utils::{ }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE, + CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -77,9 +81,9 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper, - GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, PaymentStore, - PeerManager, PendingPaymentStore, + AsyncPersister, ChainMonitor, ChannelManager, ChannelRecordStore, ClosedChannelStore, DynStore, + DynStoreRef, DynStoreWrapper, GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, + OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1379,24 +1383,41 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = - runtime.block_on(async move { - tokio::join!( - read_all_objects( - &*kv_store_ref, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - Arc::clone(&logger_ref), - ), - read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), - read_all_objects( - &*kv_store_ref, - PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - Arc::clone(&logger_ref), - ) - ) - }); + let ( + payment_store_res, + node_metris_res, + pending_payment_store_res, + closed_channel_store_res, + channel_record_store_res, + ) = runtime.block_on(async move { + tokio::join!( + read_all_objects( + &*kv_store_ref, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + read_all_objects( + &*kv_store_ref, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + ) + }); // Initialize the status fields. let node_metrics = match node_metris_res { @@ -1425,6 +1446,20 @@ fn build_with_store_internal( }, }; + let closed_channel_store = match closed_channel_store_res { + Ok(channels) => Arc::new(ClosedChannelStore::new( + channels, + CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read closed channel data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); @@ -1605,6 +1640,20 @@ fn build_with_store_internal( }, }; + let channel_record_store = match channel_record_store_res { + Ok(channel_records) => Arc::new(ChannelRecordStore::new( + channel_records, + CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel record data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let wallet = Arc::new(Wallet::new( bdk_wallet, wallet_persister, @@ -2149,6 +2198,8 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + closed_channel_store, + channel_record_store, lnurl_auth, is_running, node_metrics, diff --git a/src/channel/mod.rs b/src/channel/mod.rs new file mode 100644 index 000000000..95a9f5224 --- /dev/null +++ b/src/channel/mod.rs @@ -0,0 +1,10 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Per-channel state tracking. + +pub(crate) mod store; diff --git a/src/channel/store.rs b/src/channel/store.rs new file mode 100644 index 000000000..0d2969218 --- /dev/null +++ b/src/channel/store.rs @@ -0,0 +1,130 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use bitcoin::secp256k1::PublicKey; +use lightning::impl_writeable_tlv_based_enum; +use lightning::ln::types::ChannelId; + +use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; +use crate::hex_utils; +use crate::types::UserChannelId; + +/// Persistent per-channel state tracked by LDK Node, keyed by [`UserChannelId`]. +/// +/// Durably stores channel flags at `ChannelPending` time so they remain accessible when the +/// channel closes, even after a restart or a [`ReplayEvent`]. The `Funded` variant is designed +/// to be extended with a `pending_splice` field in a future PR to support splice retry across +/// restarts and peer disconnects. +/// +/// [`ReplayEvent`]: lightning::events::ReplayEvent +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum ChannelRecord { + /// State for a live channel whose funding transaction exists. + Funded { + user_channel_id: UserChannelId, + /// The node ID of the channel counterparty. + counterparty_node_id: PublicKey, + /// The channel's ID at the time the `ChannelPending` event fired. + channel_id: ChannelId, + /// Whether we opened the channel (outbound) or the counterparty did (inbound). + is_outbound: bool, + /// Whether the channel was publicly announced. + is_announced: bool, + }, +} + +impl_writeable_tlv_based_enum!(ChannelRecord, + (0, Funded) => { + (0, user_channel_id, required), + (2, counterparty_node_id, required), + (4, channel_id, required), + (6, is_outbound, required), + (8, is_announced, required), + }, +); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelRecordUpdate { + pub user_channel_id: UserChannelId, +} + +impl StorableObjectUpdate for ChannelRecordUpdate { + fn id(&self) -> UserChannelId { + self.user_channel_id + } +} + +impl StorableObject for ChannelRecord { + type Id = UserChannelId; + type Update = ChannelRecordUpdate; + + fn id(&self) -> UserChannelId { + match self { + ChannelRecord::Funded { user_channel_id, .. } => *user_channel_id, + } + } + + fn update(&mut self, _update: Self::Update) -> bool { + // ChannelRecord fields are immutable once written in this version. Returning false + // makes insert_or_update a no-op when the record already exists, ensuring idempotency + // on ChannelPending replay. + false + } + + fn to_update(&self) -> Self::Update { + ChannelRecordUpdate { user_channel_id: self.id() } + } +} + +impl StorableObjectId for UserChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0.to_be_bytes()) + } +} + +#[cfg(test)] +mod tests { + use lightning::ln::types::ChannelId; + use lightning::util::ser::{Readable, Writeable}; + + use super::*; + + fn make_record(is_outbound: bool, is_announced: bool) -> ChannelRecord { + let user_channel_id = UserChannelId(42); + // A valid compressed public key: prefix 0x02 followed by 32 bytes. + let counterparty_node_id = PublicKey::from_slice(&[2u8; 33]).expect("valid pubkey"); + let channel_id = ChannelId([3u8; 32]); + ChannelRecord::Funded { + user_channel_id, + counterparty_node_id, + channel_id, + is_outbound, + is_announced, + } + } + + #[test] + fn channel_record_roundtrips() { + for (is_outbound, is_announced) in + [(true, false), (false, true), (true, true), (false, false)] + { + let record = make_record(is_outbound, is_announced); + let encoded = record.encode(); + let decoded = ChannelRecord::read(&mut &encoded[..]).expect("decode succeeds"); + assert_eq!(record, decoded); + assert_eq!(decoded.id(), UserChannelId(42)); + assert!(matches!( + decoded, + ChannelRecord::Funded { + is_outbound: dec_out, + is_announced: dec_ann, + .. + } if dec_out == is_outbound && dec_ann == is_announced + )); + } + } +} diff --git a/src/closed_channel.rs b/src/closed_channel.rs new file mode 100644 index 000000000..0bd00665e --- /dev/null +++ b/src/closed_channel.rs @@ -0,0 +1,98 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use bitcoin::secp256k1::PublicKey; +use bitcoin::OutPoint; +use lightning::events::ClosureReason; +use lightning::impl_writeable_tlv_based; +use lightning::ln::types::ChannelId; + +use crate::data_store::{StorableObject, StorableObjectUpdate}; +use crate::types::UserChannelId; + +/// Details of a closed channel. +/// +/// Returned by [`Node::list_closed_channels`]. +/// +/// [`Node::list_closed_channels`]: crate::Node::list_closed_channels +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ClosedChannelDetails { + /// The channel's ID at the time it was closed. + pub channel_id: ChannelId, + /// The local identifier of the channel. + pub user_channel_id: UserChannelId, + /// The node ID of the channel's counterparty. + pub counterparty_node_id: PublicKey, + /// The channel's funding transaction outpoint. + /// + /// Will be `None` if the channel was closed before a funding transaction was established. + pub funding_txo: Option, + /// The channel's capacity in satoshis. + /// + /// Will be `None` if the channel was closed before the capacity was known. + pub channel_capacity_sats: Option, + /// Our local balance in millisatoshis at the time of channel closure. + /// + /// Will be `None` if the local balance was not available at the time of closure. + pub last_local_balance_msat: Option, + /// Indicates whether we initiated the channel opening. + /// + /// `true` if the channel was opened by us (outbound), `false` if opened by the counterparty + /// (inbound). This will be `false` for channels opened prior to this field being tracked. + pub is_outbound: bool, + /// Indicates whether the channel was publicly announced. + /// + /// This will be `false` for channels opened prior to this field being tracked. + pub is_announced: bool, + /// The reason for the channel closure. + /// + /// Will be `None` if the closure reason could not be decoded, e.g., if it was written by a + /// future version of LDK Node using a closure reason variant not yet known to this version. + pub closure_reason: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when the channel was closed. + pub closed_at: u64, +} + +impl_writeable_tlv_based!(ClosedChannelDetails, { + (0, channel_id, required), + (2, user_channel_id, required), + (4, counterparty_node_id, required), + (6, funding_txo, option), + (8, channel_capacity_sats, option), + (10, last_local_balance_msat, option), + (12, is_outbound, required), + (14, closure_reason, upgradable_option), + (16, closed_at, required), + (18, is_announced, required), +}); + +pub(crate) struct ClosedChannelDetailsUpdate(pub UserChannelId); + +impl StorableObjectUpdate for ClosedChannelDetailsUpdate { + fn id(&self) -> UserChannelId { + self.0 + } +} + +impl StorableObject for ClosedChannelDetails { + type Id = UserChannelId; + type Update = ClosedChannelDetailsUpdate; + + fn id(&self) -> UserChannelId { + self.user_channel_id + } + + fn update(&mut self, _update: Self::Update) -> bool { + // Closed channel records are immutable once written. + false + } + + fn to_update(&self) -> Self::Update { + ClosedChannelDetailsUpdate(self.user_channel_id) + } +} diff --git a/src/event.rs b/src/event.rs index 80acd0690..dd27cf6eb 100644 --- a/src/event.rs +++ b/src/event.rs @@ -7,9 +7,10 @@ use core::future::Future; use core::task::{Poll, Waker}; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; @@ -33,6 +34,8 @@ use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; +use crate::channel::store::ChannelRecord; +use crate::closed_channel::ClosedChannelDetails; use crate::config::{may_announce_channel, Config}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; @@ -53,7 +56,8 @@ use crate::payment::store::{ use crate::payment::PaymentMetadata; use crate::runtime::Runtime; use crate::types::{ - CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet, + ChannelRecordStore, ClosedChannelStore, CustomTlvRecord, DynStore, KeysManager, OnionMessenger, + PaymentStore, Sweeper, Wallet, }; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, @@ -246,9 +250,7 @@ pub enum Event { /// The `user_channel_id` of the channel. user_channel_id: UserChannelId, /// The `node_id` of the channel counterparty. - /// - /// This will be `None` for events serialized by LDK Node v0.1.0 and prior. - counterparty_node_id: Option, + counterparty_node_id: PublicKey, /// The outpoint of the channel's funding transaction. /// /// This represents the channel's current funding output, which may change when the @@ -265,11 +267,21 @@ pub enum Event { /// The `user_channel_id` of the channel. user_channel_id: UserChannelId, /// The `node_id` of the channel counterparty. - /// - /// This will be `None` for events serialized by LDK Node v0.1.0 and prior. - counterparty_node_id: Option, + counterparty_node_id: PublicKey, /// This will be `None` for events serialized by LDK Node v0.2.1 and prior. reason: Option, + /// The channel's capacity in satoshis. + /// + /// This will be `None` for events serialized by LDK Node v0.8.0 and prior. + channel_capacity_sats: Option, + /// The channel's funding transaction outpoint. + /// + /// This will be `None` for events serialized by LDK Node v0.8.0 and prior. + channel_funding_txo: Option, + /// Our local balance in millisatoshis at the time of channel closure. + /// + /// This will be `None` for events serialized by LDK Node v0.8.0 and prior. + last_local_balance_msat: Option, }, /// A channel splice has been negotiated and the funding transaction is pending /// confirmation on-chain. @@ -315,7 +327,7 @@ impl_writeable_tlv_based_enum!(Event, }, (3, ChannelReady) => { (0, channel_id, required), - (1, counterparty_node_id, option), + (1, counterparty_node_id, required), (2, user_channel_id, required), (3, funding_txo, option), }, @@ -328,9 +340,12 @@ impl_writeable_tlv_based_enum!(Event, }, (5, ChannelClosed) => { (0, channel_id, required), - (1, counterparty_node_id, option), + (1, counterparty_node_id, required), (2, user_channel_id, required), (3, reason, upgradable_option), + (5, channel_capacity_sats, option), + (7, channel_funding_txo, option), + (9, last_local_balance_msat, option), }, (6, PaymentClaimable) => { (0, payment_hash, required), @@ -536,6 +551,14 @@ where liquidity_source: Arc>>, payment_store: Arc, peer_store: Arc>, + closed_channel_store: Arc, + channel_record_store: Arc, + // Tracks which user_channel_ids correspond to outbound channels. Populated at startup from + // list_channels() and updated on ChannelPending events. Consumed on ChannelClosed events. + outbound_channel_ids: Mutex>, + // Tracks which user_channel_ids correspond to announced channels. Populated at startup from + // list_channels() and updated on ChannelPending events. Consumed on ChannelClosed events. + announced_channel_ids: Mutex>, keys_manager: Arc, runtime: Arc, logger: L, @@ -555,11 +578,28 @@ where channel_manager: Arc, connection_manager: Arc>, output_sweeper: Arc, network_graph: Arc, liquidity_source: Arc>>, payment_store: Arc, - peer_store: Arc>, keys_manager: Arc, + peer_store: Arc>, closed_channel_store: Arc, + channel_record_store: Arc, keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, runtime: Arc, logger: L, config: Arc, ) -> Self { + // Seed outbound_channel_ids and announced_channel_ids from currently open channels so we + // correctly classify channels that were already open when this node started. + let (outbound_channel_ids, announced_channel_ids) = { + let mut outbound = HashSet::new(); + let mut announced = HashSet::new(); + for chan in channel_manager.list_channels() { + if chan.is_outbound { + outbound.insert(UserChannelId(chan.user_channel_id)); + } + if chan.is_announced { + announced.insert(UserChannelId(chan.user_channel_id)); + } + } + (Mutex::new(outbound), Mutex::new(announced)) + }; + Self { event_queue, wallet, @@ -571,6 +611,10 @@ where liquidity_source, payment_store, peer_store, + closed_channel_store, + channel_record_store, + outbound_channel_ids, + announced_channel_ids, keys_manager, logger, runtime, @@ -1346,12 +1390,23 @@ where 100 ); } + // For LSPS2 JIT channels (channel_override_config is Some iff the counterparty + // is our configured LSP), accept with ZeroConfZeroReserve so the LSP is not + // forced to keep 1000 sats locked as reserve. Without this, the hard + // MIN_THEIR_CHAN_RESERVE_SATOSHIS = 1000 floor in LDK reduces the usable + // outbound capacity enough that the initial HTLC forward fails on small channels. + let is_lsps2_channel = channel_override_config.is_some(); let res = if allow_0conf { + let trusted_features = if is_lsps2_channel { + TrustedChannelFeatures::ZeroConfZeroReserve + } else { + TrustedChannelFeatures::ZeroConf + }; self.channel_manager.accept_inbound_channel_from_trusted_peer( &temporary_channel_id, &counterparty_node_id, user_channel_id, - TrustedChannelFeatures::ZeroConf, + trusted_features, channel_override_config, ) } else { @@ -1527,15 +1582,39 @@ where }, }; - let peer_to_store = { + let (record_opt, peer_to_store) = { let network_graph = self.network_graph.read_only(); let channels = self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); - channels - .into_iter() - .find(|c| c.channel_id == channel_id) - .filter(|pending_channel| { - !pending_channel.is_outbound + let pending_channel = channels.into_iter().find(|c| c.channel_id == channel_id); + + let record_opt = if let Some(ref ch) = pending_channel { + if ch.is_outbound { + self.outbound_channel_ids + .lock() + .expect("Lock poisoned") + .insert(UserChannelId(user_channel_id)); + } + if ch.is_announced { + self.announced_channel_ids + .lock() + .expect("Lock poisoned") + .insert(UserChannelId(user_channel_id)); + } + Some(ChannelRecord::Funded { + user_channel_id: UserChannelId(user_channel_id), + counterparty_node_id, + channel_id, + is_outbound: ch.is_outbound, + is_announced: ch.is_announced, + }) + } else { + None + }; + + let peer_to_store = pending_channel + .filter(|ch| { + !ch.is_outbound && self.peer_store.get_peer(&counterparty_node_id).is_none() }) .and_then(|_| { @@ -1548,8 +1627,22 @@ where node_id: counterparty_node_id, address: address.clone(), }) - }) - }; + }); + + (record_opt, peer_to_store) + }; // network_graph is dropped here, before any await + + if let Some(record) = record_opt { + if let Err(e) = self.channel_record_store.insert_or_update(record).await { + log_error!( + self.logger, + "Failed to persist channel record for {}: {}", + channel_id, + e + ); + return Err(ReplayEvent()); + } + } if let Some(peer) = peer_to_store { self.peer_store.add_peer(peer).await.unwrap_or_else(|e| { log_error!( @@ -1593,7 +1686,7 @@ where let event = Event::ChannelReady { channel_id, user_channel_id: UserChannelId(user_channel_id), - counterparty_node_id: Some(counterparty_node_id), + counterparty_node_id, funding_txo, }; match self.event_queue.add_event(event).await { @@ -1609,19 +1702,100 @@ where reason, user_channel_id, counterparty_node_id, - .. + channel_capacity_sats, + channel_funding_txo, + last_local_balance_msat, } => { log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason); + let user_channel_id = UserChannelId(user_channel_id); + let is_outbound_from_set = self + .outbound_channel_ids + .lock() + .expect("Lock poisoned") + .remove(&user_channel_id); + let is_announced_from_set = self + .announced_channel_ids + .lock() + .expect("Lock poisoned") + .remove(&user_channel_id); + + // Primary: use the durably-persisted ChannelRecord written at ChannelPending + // time. Falls back to in-memory sets (populated at startup or on + // ChannelPending), then to any already-persisted ClosedChannelDetails record + // (for the replay case where insert_or_update already succeeded but + // add_event failed and the ChannelRecord was already cleaned up). + let (is_outbound, is_announced) = self + .channel_record_store + .get(&user_channel_id) + .and_then(|record| match record { + ChannelRecord::Funded { is_outbound, is_announced, .. } => { + Some((is_outbound, is_announced)) + }, + }) + .or_else(|| { + self.closed_channel_store + .get(&user_channel_id) + .map(|existing| (existing.is_outbound, existing.is_announced)) + }) + .unwrap_or((is_outbound_from_set, is_announced_from_set)); + + let closed_at = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_secs(); + + let funding_txo = channel_funding_txo.map(|op| op.into_bitcoin_outpoint()); + + // Since LDK Node v0.2 this is expected to always be set. See + // CHANGELOG.md for details on the serialization compatibility break. + let counterparty_node_id = counterparty_node_id + .expect("counterparty_node_id must be set for closed channels"); + + let record = ClosedChannelDetails { + channel_id, + user_channel_id, + counterparty_node_id, + funding_txo, + channel_capacity_sats, + last_local_balance_msat, + is_outbound, + is_announced, + closure_reason: Some(reason.clone()), + closed_at, + }; + + if let Err(e) = self.closed_channel_store.insert_or_update(record).await { + log_error!( + self.logger, + "Failed to persist closed channel {}: {}", + channel_id, + e + ); + return Err(ReplayEvent()); + } + let event = Event::ChannelClosed { channel_id, - user_channel_id: UserChannelId(user_channel_id), + user_channel_id, counterparty_node_id, reason: Some(reason), + channel_capacity_sats, + channel_funding_txo: funding_txo, + last_local_balance_msat, }; match self.event_queue.add_event(event).await { - Ok(_) => {}, + Ok(_) => { + if let Err(e) = self.channel_record_store.remove(&user_channel_id).await { + log_error!( + self.logger, + "Failed to remove channel record for {}: {}", + channel_id, + e + ); + } + }, Err(e) => { log_error!(self.logger, "Failed to push to event queue: {}", e); return Err(ReplayEvent()); @@ -1914,6 +2088,7 @@ mod tests { use std::sync::atomic::{AtomicU16, Ordering}; use std::time::Duration; + use bitcoin::secp256k1::{Secp256k1, SecretKey}; use lightning::util::test_utils::TestLogger; use super::*; @@ -1977,10 +2152,13 @@ mod tests { let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger))); assert_eq!(event_queue.next_event(), None); + let secp = Secp256k1::new(); + let counterparty_node_id = + PublicKey::from_secret_key(&secp, &SecretKey::from_slice(&[1u8; 32]).unwrap()); let expected_event = Event::ChannelReady { channel_id: ChannelId([23u8; 32]), user_channel_id: UserChannelId(2323), - counterparty_node_id: None, + counterparty_node_id, funding_txo: None, }; event_queue.add_event(expected_event.clone()).await.unwrap(); @@ -2015,10 +2193,13 @@ mod tests { let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger))); assert_eq!(event_queue.next_event(), None); + let secp = Secp256k1::new(); + let counterparty_node_id = + PublicKey::from_secret_key(&secp, &SecretKey::from_slice(&[1u8; 32]).unwrap()); let expected_event = Event::ChannelReady { channel_id: ChannelId([23u8; 32]), user_channel_id: UserChannelId(2323), - counterparty_node_id: None, + counterparty_node_id, funding_txo: None, }; diff --git a/src/io/mod.rs b/src/io/mod.rs index e16a99975..b37fb22f8 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -29,6 +29,14 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments"; pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The closed channel information will be persisted under this prefix. +pub(crate) const CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "closed_channels"; +pub(crate) const CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The per-channel records will be persisted under this prefix. +pub(crate) const CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE: &str = "channel_records"; +pub(crate) const CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// The node metrics will be persisted under this key. pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/lib.rs b/src/lib.rs index b45064287..c2cae5283 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,8 @@ mod balance; mod builder; mod chain; +mod channel; +pub(crate) mod closed_channel; pub mod config; mod connection; mod data_store; @@ -128,6 +130,7 @@ pub use builder::BuildError; #[cfg(not(feature = "uniffi"))] pub use builder::NodeBuilder as Builder; use chain::ChainSource; +pub use closed_channel::ClosedChannelDetails; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, LNURL_AUTH_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, @@ -175,9 +178,9 @@ use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, ChannelRecordStore, + ClosedChannelStore, DynStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, + PeerManager, Router, Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId}; pub use vss_client; @@ -242,6 +245,8 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + closed_channel_store: Arc, + channel_record_store: Arc, lnurl_auth: Arc, is_running: Arc>, node_metrics: Arc, @@ -604,6 +609,8 @@ impl Node { Arc::clone(&self.liquidity_source), Arc::clone(&self.payment_store), Arc::clone(&self.peer_store), + Arc::clone(&self.closed_channel_store), + Arc::clone(&self.channel_record_store), Arc::clone(&self.keys_manager), static_invoice_store, Arc::clone(&self.onion_messenger), @@ -1148,6 +1155,13 @@ impl Node { self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect() } + /// Retrieve a list of closed channels. + /// + /// Channels are added to this list when they are closed and will be persisted across restarts. + pub fn list_closed_channels(&self) -> Vec { + self.closed_channel_store.list_filter(|_| true) + } + /// Connect to a node on the peer-to-peer network. /// /// If `persist` is set to `true`, we'll remember the peer and reconnect to it on restart. diff --git a/src/types.rs b/src/types.rs index 64209430b..4253c5733 100644 --- a/src/types.rs +++ b/src/types.rs @@ -38,6 +38,8 @@ use lightning_net_tokio::SocketDescriptor; use crate::chain::bitcoind::UtxoSourceClient; use crate::chain::ChainSource; +use crate::channel::store::ChannelRecord; +use crate::closed_channel::ClosedChannelDetails; use crate::config::ChannelConfig; use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; @@ -318,7 +320,7 @@ pub(crate) type PaymentStore = DataStore>; /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct UserChannelId(pub u128); impl Writeable for UserChannelId { @@ -628,3 +630,7 @@ impl From<&(u64, Vec)> for CustomTlvRecord { } pub(crate) type PendingPaymentStore = DataStore>; + +pub(crate) type ClosedChannelStore = DataStore>; + +pub(crate) type ChannelRecordStore = DataStore>; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index adeb327bf..2c599de72 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -133,7 +133,7 @@ macro_rules! expect_channel_ready_event { match event { ref e @ Event::ChannelReady { user_channel_id, counterparty_node_id, .. } => { println!("{} got event {:?}", $node.node_id(), e); - assert_eq!(counterparty_node_id, Some($counterparty_node_id)); + assert_eq!(counterparty_node_id, $counterparty_node_id); $node.event_handled().unwrap(); user_channel_id }, @@ -170,8 +170,7 @@ macro_rules! expect_channel_ready_events { } } assert!( - ids.contains(&Some($counterparty_node_id_a)) - && ids.contains(&Some($counterparty_node_id_b)), + ids.contains(&$counterparty_node_id_a) && ids.contains(&$counterparty_node_id_b), "Expected ChannelReady events from {:?} and {:?}, but got {:?}", $counterparty_node_id_a, $counterparty_node_id_b, @@ -1638,6 +1637,23 @@ pub(crate) async fn do_channel_full_cycle( assert_eq!(node_a.next_event(), None); assert_eq!(node_b.next_event(), None); + // Check that the closed channel record was persisted. + let closed_a = node_a.list_closed_channels(); + let closed_b = node_b.list_closed_channels(); + assert_eq!(closed_a.len(), 1); + assert_eq!(closed_b.len(), 1); + assert!(closed_a[0].channel_capacity_sats.is_some()); + assert!(closed_b[0].channel_capacity_sats.is_some()); + assert!(closed_a[0].is_outbound, "node_a opened the channel, should be outbound"); + assert!(!closed_b[0].is_outbound, "node_b received the channel, should be inbound"); + assert!(closed_a[0].closure_reason.is_some()); + assert!(closed_b[0].closure_reason.is_some()); + assert!(closed_a[0].funding_txo.is_some()); + assert!(closed_b[0].funding_txo.is_some()); + assert_eq!(closed_a[0].funding_txo, closed_b[0].funding_txo); + assert_eq!(closed_a[0].counterparty_node_id, node_b.node_id()); + assert_eq!(closed_b[0].counterparty_node_id, node_a.node_id()); + node_a.stop().unwrap(); println!("\nA stopped"); node_b.stop().unwrap(); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 5b07ab50d..c7d360ec0 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -35,7 +35,7 @@ use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, UnifiedPaymentResult, }; -use ldk_node::{Builder, Event, NodeError}; +use ldk_node::{Builder, ClosedChannelDetails, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; @@ -3140,3 +3140,111 @@ async fn do_lsps2_multi_lsp_picks_cheapest(reverse_order: bool) { cheap.stop().unwrap(); expensive.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn closed_channel_history_persists_after_restart() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + println!("== Node A =="); + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + + println!("\n== Node B =="); + let mut config_b = random_config(true); + config_b.store_type = TestStoreType::Sqlite; + + let channel_amount_sat = 1_000_000; + let premine_amount_sat = 2_125_000; + + let closed_channel_before: ClosedChannelDetails; + + { + let node_a = setup_node(&chain_source, config_a.clone()); + let node_b = setup_node(&chain_source, config_b.clone()); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Open channel from A to B. + let funding_txo = open_channel(&node_a, &node_b, channel_amount_sat, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + // Cooperatively close. + let user_channel_id = node_a + .list_channels() + .into_iter() + .find(|c| c.counterparty_node_id == node_b.node_id()) + .map(|c| c.user_channel_id) + .expect("open channel not found"); + node_a.close_channel(&user_channel_id, node_b.node_id()).unwrap(); + expect_event!(node_a, ChannelClosed); + expect_event!(node_b, ChannelClosed); + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify the record is present before restart. + let closed_a = node_a.list_closed_channels(); + assert_eq!(closed_a.len(), 1); + let record = closed_a.into_iter().next().unwrap(); + assert_eq!(record.channel_capacity_sats, Some(channel_amount_sat)); + assert!(record.is_outbound); + assert!(record.is_announced); + assert_eq!(record.counterparty_node_id, node_b.node_id()); + assert!(record.funding_txo.is_some()); + assert_eq!(record.funding_txo.unwrap().txid, funding_txo.txid); + assert!(record.closure_reason.is_some()); + + closed_channel_before = record; + + let closed_b = node_b.list_closed_channels(); + assert_eq!(closed_b.len(), 1); + let record_b = closed_b.into_iter().next().unwrap(); + assert_eq!(record_b.channel_capacity_sats, Some(channel_amount_sat)); + assert!(!record_b.is_outbound); + assert!(record_b.is_announced); + assert_eq!(record_b.counterparty_node_id, node_a.node_id()); + assert!(record_b.funding_txo.is_some()); + assert_eq!(record_b.funding_txo.unwrap().txid, funding_txo.txid); + assert!(record_b.closure_reason.is_some()); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + } + + // Restart node_a with the same config and verify the record survived. + println!("\nRestarting node A..."); + let restarted_node_a = setup_node(&chain_source, config_a); + + let closed_after = restarted_node_a.list_closed_channels(); + assert_eq!(closed_after.len(), 1, "closed channel record should survive restart"); + + let record = &closed_after[0]; + assert_eq!(record.channel_id, closed_channel_before.channel_id); + assert_eq!(record.user_channel_id, closed_channel_before.user_channel_id); + assert_eq!(record.channel_capacity_sats, closed_channel_before.channel_capacity_sats); + assert_eq!(record.funding_txo, closed_channel_before.funding_txo); + assert_eq!(record.counterparty_node_id, closed_channel_before.counterparty_node_id); + assert_eq!(record.is_outbound, closed_channel_before.is_outbound); + assert_eq!(record.is_announced, closed_channel_before.is_announced); + assert_eq!(record.closed_at, closed_channel_before.closed_at); + assert_eq!(record.closure_reason, closed_channel_before.closure_reason); + + restarted_node_a.stop().unwrap(); +} diff --git a/tests/reorg_test.rs b/tests/reorg_test.rs index 295d9fdd2..f2c3b6586 100644 --- a/tests/reorg_test.rs +++ b/tests/reorg_test.rs @@ -112,11 +112,11 @@ proptest! { let next_node = nodes.get((i + 1) % nodes.len()).unwrap(); let prev_node = nodes.get((i + nodes.len() - 1) % nodes.len()).unwrap(); - assert!(user_channels.get(&Some(next_node.node_id())) != None); - assert!(user_channels.get(&Some(prev_node.node_id())) != None); + assert!(user_channels.get(&next_node.node_id()) != None); + assert!(user_channels.get(&prev_node.node_id()) != None); let user_channel_id = - user_channels.get(&Some(next_node.node_id())).expect("Missing user channel for node"); + user_channels.get(&next_node.node_id()).expect("Missing user channel for node"); node_channels_id.insert(node.node_id(), *user_channel_id); } diff --git a/tests/upgrade_downgrade_tests.rs b/tests/upgrade_downgrade_tests.rs index b30b5a33c..c3ee6868e 100644 --- a/tests/upgrade_downgrade_tests.rs +++ b/tests/upgrade_downgrade_tests.rs @@ -290,7 +290,7 @@ async fn expect_current_channel_pending( async fn expect_current_channel_ready(node: &CurrentNode, expected_counterparty: PublicKey) { match next_current_event(node).await { ldk_node::Event::ChannelReady { counterparty_node_id, .. } => { - assert_eq!(counterparty_node_id, Some(expected_counterparty)); + assert_eq!(counterparty_node_id, expected_counterparty); node.event_handled().unwrap(); }, event => panic!("{} got unexpected event: {:?}", node.node_id(), event),