From 55c9014387aabe09a86838e08d5889046204b560 Mon Sep 17 00:00:00 2001 From: Fernando Ledesma Date: Mon, 20 Apr 2026 11:13:58 -0500 Subject: [PATCH 1/6] Add persistent closed channel history and `list_closed_channels()` Introduce `ClosedChannelDetails`, a new record type persisted to the KV store under the `"closed_channels"` namespace whenever a channel closes. Records are written in the `ChannelClosed` event handler and loaded back at startup in parallel with other stores via `tokio::join!`. Add `Node::list_closed_channels()` to expose the full history of closed channels across restarts. Track outbound channel direction via an in-memory `outbound_channel_ids` set seeded from `channel_manager.list_channels()` at startup and updated on `ChannelPending` events, since `ChannelClosed` does not carry that information directly. --- bindings/ldk_node.udl | 3 + src/builder.rs | 33 ++++++-- src/closed_channel.rs | 98 ++++++++++++++++++++++++ src/event.rs | 132 ++++++++++++++++++++++++++++---- src/io/mod.rs | 4 + src/lib.rs | 17 +++- src/types.rs | 5 +- tests/common/mod.rs | 17 ++++ tests/integration_tests_rust.rs | 110 +++++++++++++++++++++++++- 9 files changed, 396 insertions(+), 23 deletions(-) create mode 100644 src/closed_channel.rs diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 851583c5a..a6f341847 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -138,6 +138,7 @@ interface Node { sequence list_payments(); sequence list_peers(); sequence list_channels(); + sequence list_closed_channels(); NetworkGraph network_graph(); string sign_message([ByRef]sequence msg); boolean verify_signature([ByRef]sequence msg, [ByRef]string sig, [ByRef]PublicKey pkey); @@ -321,6 +322,8 @@ dictionary OutPoint { typedef dictionary ChannelDetails; +typedef dictionary ClosedChannelDetails; + typedef dictionary PeerDetails; [Remote] diff --git a/src/builder.rs b/src/builder.rs index 3df594b7c..87f297dc2 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -64,7 +64,9 @@ use crate::io::utils::{ }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -77,9 +79,9 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper, - GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, PaymentStore, - PeerManager, PendingPaymentStore, + AsyncPersister, ChainMonitor, ChannelManager, ClosedChannelStore, DynStore, DynStoreRef, + DynStoreWrapper, GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, + PaymentStore, PeerManager, PendingPaymentStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1379,7 +1381,7 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = + let (payment_store_res, node_metris_res, pending_payment_store_res, closed_channel_store_res) = runtime.block_on(async move { tokio::join!( read_all_objects( @@ -1394,6 +1396,12 @@ fn build_with_store_internal( PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), ) ) }); @@ -1425,6 +1433,20 @@ fn build_with_store_internal( }, }; + let closed_channel_store = match closed_channel_store_res { + Ok(channels) => Arc::new(ClosedChannelStore::new( + channels, + CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read closed channel data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); @@ -2149,6 +2171,7 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + closed_channel_store, lnurl_auth, is_running, node_metrics, diff --git a/src/closed_channel.rs b/src/closed_channel.rs new file mode 100644 index 000000000..adbcf855b --- /dev/null +++ b/src/closed_channel.rs @@ -0,0 +1,98 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use bitcoin::secp256k1::PublicKey; +use bitcoin::OutPoint; +use lightning::events::ClosureReason; +use lightning::impl_writeable_tlv_based; +use lightning::ln::types::ChannelId; + +use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; +use crate::hex_utils; +use crate::types::UserChannelId; + +/// Details of a closed channel. +/// +/// Returned by [`Node::list_closed_channels`]. +/// +/// [`Node::list_closed_channels`]: crate::Node::list_closed_channels +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ClosedChannelDetails { + /// The channel's ID at the time it was closed. + pub channel_id: ChannelId, + /// The local identifier of the channel. + pub user_channel_id: UserChannelId, + /// The node ID of the channel's counterparty. + pub counterparty_node_id: Option, + /// The channel's funding transaction outpoint. + pub funding_txo: Option, + /// The channel's capacity in satoshis. + pub channel_capacity_sats: Option, + /// Our local balance in millisatoshis at the time of channel closure. + pub last_local_balance_msat: Option, + /// Indicates whether we initiated the channel opening. + /// + /// `true` if the channel was opened by us (outbound), `false` if opened by the counterparty + /// (inbound). This will be `false` for channels opened prior to this field being tracked. + pub is_outbound: bool, + /// Indicates whether the channel was publicly announced. + /// + /// This will be `false` for channels opened prior to this field being tracked. + pub is_announced: bool, + /// The reason for the channel closure. + pub closure_reason: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when the channel was closed. + pub closed_at: u64, +} + +impl_writeable_tlv_based!(ClosedChannelDetails, { + (0, channel_id, required), + (2, user_channel_id, required), + (4, counterparty_node_id, option), + (6, funding_txo, option), + (8, channel_capacity_sats, option), + (10, last_local_balance_msat, option), + (12, is_outbound, required), + (14, closure_reason, upgradable_option), + (16, closed_at, (default_value, SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::from_secs(0)).as_secs())), + (18, is_announced, required), +}); + +pub(crate) struct ClosedChannelDetailsUpdate(pub UserChannelId); + +impl StorableObjectUpdate for ClosedChannelDetailsUpdate { + fn id(&self) -> UserChannelId { + self.0 + } +} + +impl StorableObject for ClosedChannelDetails { + type Id = UserChannelId; + type Update = ClosedChannelDetailsUpdate; + + fn id(&self) -> UserChannelId { + self.user_channel_id + } + + fn update(&mut self, _update: Self::Update) -> bool { + // Closed channel records are immutable once written. + false + } + + fn to_update(&self) -> Self::Update { + ClosedChannelDetailsUpdate(self.user_channel_id) + } +} + +impl StorableObjectId for UserChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0.to_be_bytes()) + } +} diff --git a/src/event.rs b/src/event.rs index 80acd0690..8722d3de6 100644 --- a/src/event.rs +++ b/src/event.rs @@ -7,9 +7,10 @@ use core::future::Future; use core::task::{Poll, Waker}; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; @@ -33,6 +34,7 @@ use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; +use crate::closed_channel::ClosedChannelDetails; use crate::config::{may_announce_channel, Config}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; @@ -53,7 +55,8 @@ use crate::payment::store::{ use crate::payment::PaymentMetadata; use crate::runtime::Runtime; use crate::types::{ - CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet, + ClosedChannelStore, CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, + Sweeper, Wallet, }; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, @@ -270,6 +273,18 @@ pub enum Event { counterparty_node_id: Option, /// This will be `None` for events serialized by LDK Node v0.2.1 and prior. reason: Option, + /// The channel's capacity in satoshis. + /// + /// This will be `None` for events serialized by LDK Node v0.8.0 and prior. + channel_capacity_sats: Option, + /// The channel's funding transaction outpoint. + /// + /// This will be `None` for events serialized by LDK Node v0.8.0 and prior. + channel_funding_txo: Option, + /// Our local balance in millisatoshis at the time of channel closure. + /// + /// This will be `None` for events serialized by LDK Node v0.8.0 and prior. + last_local_balance_msat: Option, }, /// A channel splice has been negotiated and the funding transaction is pending /// confirmation on-chain. @@ -331,6 +346,9 @@ impl_writeable_tlv_based_enum!(Event, (1, counterparty_node_id, option), (2, user_channel_id, required), (3, reason, upgradable_option), + (5, channel_capacity_sats, option), + (7, channel_funding_txo, option), + (9, last_local_balance_msat, option), }, (6, PaymentClaimable) => { (0, payment_hash, required), @@ -536,6 +554,13 @@ where liquidity_source: Arc>>, payment_store: Arc, peer_store: Arc>, + closed_channel_store: Arc, + // Tracks which user_channel_ids correspond to outbound channels. Populated at startup from + // list_channels() and updated on ChannelPending events. Consumed on ChannelClosed events. + outbound_channel_ids: Mutex>, + // Tracks which user_channel_ids correspond to announced channels. Populated at startup from + // list_channels() and updated on ChannelPending events. Consumed on ChannelClosed events. + announced_channel_ids: Mutex>, keys_manager: Arc, runtime: Arc, logger: L, @@ -555,11 +580,27 @@ where channel_manager: Arc, connection_manager: Arc>, output_sweeper: Arc, network_graph: Arc, liquidity_source: Arc>>, payment_store: Arc, - peer_store: Arc>, keys_manager: Arc, - static_invoice_store: Option, onion_messenger: Arc, - om_mailbox: Option>, runtime: Arc, logger: L, - config: Arc, + peer_store: Arc>, closed_channel_store: Arc, + keys_manager: Arc, static_invoice_store: Option, + onion_messenger: Arc, om_mailbox: Option>, + runtime: Arc, logger: L, config: Arc, ) -> Self { + // Seed outbound_channel_ids and announced_channel_ids from currently open channels so we + // correctly classify channels that were already open when this node started. + let (outbound_channel_ids, announced_channel_ids) = { + let mut outbound = HashSet::new(); + let mut announced = HashSet::new(); + for chan in channel_manager.list_channels() { + if chan.is_outbound { + outbound.insert(UserChannelId(chan.user_channel_id)); + } + if chan.is_announced { + announced.insert(UserChannelId(chan.user_channel_id)); + } + } + (Mutex::new(outbound), Mutex::new(announced)) + }; + Self { event_queue, wallet, @@ -571,6 +612,9 @@ where liquidity_source, payment_store, peer_store, + closed_channel_store, + outbound_channel_ids, + announced_channel_ids, keys_manager, logger, runtime, @@ -1531,11 +1575,26 @@ where let network_graph = self.network_graph.read_only(); let channels = self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); - channels - .into_iter() - .find(|c| c.channel_id == channel_id) - .filter(|pending_channel| { - !pending_channel.is_outbound + let pending_channel = channels.into_iter().find(|c| c.channel_id == channel_id); + + if let Some(ref ch) = pending_channel { + if ch.is_outbound { + self.outbound_channel_ids + .lock() + .expect("Lock poisoned") + .insert(UserChannelId(user_channel_id)); + } + if ch.is_announced { + self.announced_channel_ids + .lock() + .expect("Lock poisoned") + .insert(UserChannelId(user_channel_id)); + } + } + + pending_channel + .filter(|ch| { + !ch.is_outbound && self.peer_store.get_peer(&counterparty_node_id).is_none() }) .and_then(|_| { @@ -1609,15 +1668,62 @@ where reason, user_channel_id, counterparty_node_id, - .. + channel_capacity_sats, + channel_funding_txo, + last_local_balance_msat, } => { log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason); + let user_channel_id = UserChannelId(user_channel_id); + let is_outbound = self + .outbound_channel_ids + .lock() + .expect("Lock poisoned") + .remove(&user_channel_id); + let is_announced = self + .announced_channel_ids + .lock() + .expect("Lock poisoned") + .remove(&user_channel_id); + + let closed_at = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_secs(); + + let funding_txo = + channel_funding_txo.map(|op| OutPoint { txid: op.txid, vout: op.index as u32 }); + + let record = ClosedChannelDetails { + channel_id, + user_channel_id, + counterparty_node_id, + funding_txo, + channel_capacity_sats, + last_local_balance_msat, + is_outbound, + is_announced, + closure_reason: Some(reason.clone()), + closed_at, + }; + + if let Err(e) = self.closed_channel_store.insert(record).await { + log_error!( + self.logger, + "Failed to persist closed channel {}: {}", + channel_id, + e + ); + } + let event = Event::ChannelClosed { channel_id, - user_channel_id: UserChannelId(user_channel_id), + user_channel_id, counterparty_node_id, reason: Some(reason), + channel_capacity_sats, + channel_funding_txo: funding_txo, + last_local_balance_msat, }; match self.event_queue.add_event(event).await { diff --git a/src/io/mod.rs b/src/io/mod.rs index e16a99975..c36e0cd0b 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -29,6 +29,10 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments"; pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The closed channel information will be persisted under this prefix. +pub(crate) const CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "closed_channels"; +pub(crate) const CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// The node metrics will be persisted under this key. pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/lib.rs b/src/lib.rs index b45064287..98fd99238 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ mod balance; mod builder; mod chain; +pub(crate) mod closed_channel; pub mod config; mod connection; mod data_store; @@ -128,6 +129,7 @@ pub use builder::BuildError; #[cfg(not(feature = "uniffi"))] pub use builder::NodeBuilder as Builder; use chain::ChainSource; +pub use closed_channel::ClosedChannelDetails; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, LNURL_AUTH_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, @@ -175,9 +177,9 @@ use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, ClosedChannelStore, + DynStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, + Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId}; pub use vss_client; @@ -242,6 +244,7 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + closed_channel_store: Arc, lnurl_auth: Arc, is_running: Arc>, node_metrics: Arc, @@ -604,6 +607,7 @@ impl Node { Arc::clone(&self.liquidity_source), Arc::clone(&self.payment_store), Arc::clone(&self.peer_store), + Arc::clone(&self.closed_channel_store), Arc::clone(&self.keys_manager), static_invoice_store, Arc::clone(&self.onion_messenger), @@ -1148,6 +1152,13 @@ impl Node { self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect() } + /// Retrieve a list of closed channels. + /// + /// Channels are added to this list when they are closed and will be persisted across restarts. + pub fn list_closed_channels(&self) -> Vec { + self.closed_channel_store.list_filter(|_| true) + } + /// Connect to a node on the peer-to-peer network. /// /// If `persist` is set to `true`, we'll remember the peer and reconnect to it on restart. diff --git a/src/types.rs b/src/types.rs index 64209430b..538060997 100644 --- a/src/types.rs +++ b/src/types.rs @@ -38,6 +38,7 @@ use lightning_net_tokio::SocketDescriptor; use crate::chain::bitcoind::UtxoSourceClient; use crate::chain::ChainSource; +use crate::closed_channel::ClosedChannelDetails; use crate::config::ChannelConfig; use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; @@ -318,7 +319,7 @@ pub(crate) type PaymentStore = DataStore>; /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct UserChannelId(pub u128); impl Writeable for UserChannelId { @@ -628,3 +629,5 @@ impl From<&(u64, Vec)> for CustomTlvRecord { } pub(crate) type PendingPaymentStore = DataStore>; + +pub(crate) type ClosedChannelStore = DataStore>; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index adeb327bf..a78a0501d 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1638,6 +1638,23 @@ pub(crate) async fn do_channel_full_cycle( assert_eq!(node_a.next_event(), None); assert_eq!(node_b.next_event(), None); + // Check that the closed channel record was persisted. + let closed_a = node_a.list_closed_channels(); + let closed_b = node_b.list_closed_channels(); + assert_eq!(closed_a.len(), 1); + assert_eq!(closed_b.len(), 1); + assert!(closed_a[0].channel_capacity_sats.is_some()); + assert!(closed_b[0].channel_capacity_sats.is_some()); + assert!(closed_a[0].is_outbound, "node_a opened the channel, should be outbound"); + assert!(!closed_b[0].is_outbound, "node_b received the channel, should be inbound"); + assert!(closed_a[0].closure_reason.is_some()); + assert!(closed_b[0].closure_reason.is_some()); + assert!(closed_a[0].funding_txo.is_some()); + assert!(closed_b[0].funding_txo.is_some()); + assert_eq!(closed_a[0].funding_txo, closed_b[0].funding_txo); + assert_eq!(closed_a[0].counterparty_node_id, Some(node_b.node_id())); + assert_eq!(closed_b[0].counterparty_node_id, Some(node_a.node_id())); + node_a.stop().unwrap(); println!("\nA stopped"); node_b.stop().unwrap(); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 5b07ab50d..0ed27f81f 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -35,7 +35,7 @@ use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, UnifiedPaymentResult, }; -use ldk_node::{Builder, Event, NodeError}; +use ldk_node::{Builder, ClosedChannelDetails, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; @@ -3140,3 +3140,111 @@ async fn do_lsps2_multi_lsp_picks_cheapest(reverse_order: bool) { cheap.stop().unwrap(); expensive.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn closed_channel_history_persists_after_restart() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + println!("== Node A =="); + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + + println!("\n== Node B =="); + let mut config_b = random_config(true); + config_b.store_type = TestStoreType::Sqlite; + + let channel_amount_sat = 1_000_000; + let premine_amount_sat = 2_125_000; + + let closed_channel_before: ClosedChannelDetails; + + { + let node_a = setup_node(&chain_source, config_a.clone()); + let node_b = setup_node(&chain_source, config_b.clone()); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Open channel from A to B. + let funding_txo = open_channel(&node_a, &node_b, channel_amount_sat, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + // Cooperatively close. + let user_channel_id = node_a + .list_channels() + .into_iter() + .find(|c| c.counterparty_node_id == node_b.node_id()) + .map(|c| c.user_channel_id) + .expect("open channel not found"); + node_a.close_channel(&user_channel_id, node_b.node_id()).unwrap(); + expect_event!(node_a, ChannelClosed); + expect_event!(node_b, ChannelClosed); + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify the record is present before restart. + let closed_a = node_a.list_closed_channels(); + assert_eq!(closed_a.len(), 1); + let record = closed_a.into_iter().next().unwrap(); + assert_eq!(record.channel_capacity_sats, Some(channel_amount_sat)); + assert!(record.is_outbound); + assert!(record.is_announced); + assert_eq!(record.counterparty_node_id, Some(node_b.node_id())); + assert!(record.funding_txo.is_some()); + assert_eq!(record.funding_txo.unwrap().txid, funding_txo.txid); + assert!(record.closure_reason.is_some()); + + closed_channel_before = record; + + let closed_b = node_b.list_closed_channels(); + assert_eq!(closed_b.len(), 1); + let record_b = closed_b.into_iter().next().unwrap(); + assert_eq!(record_b.channel_capacity_sats, Some(channel_amount_sat)); + assert!(!record_b.is_outbound); + assert!(record_b.is_announced); + assert_eq!(record_b.counterparty_node_id, Some(node_a.node_id())); + assert!(record_b.funding_txo.is_some()); + assert_eq!(record_b.funding_txo.unwrap().txid, funding_txo.txid); + assert!(record_b.closure_reason.is_some()); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + } + + // Restart node_a with the same config and verify the record survived. + println!("\nRestarting node A..."); + let restarted_node_a = setup_node(&chain_source, config_a); + + let closed_after = restarted_node_a.list_closed_channels(); + assert_eq!(closed_after.len(), 1, "closed channel record should survive restart"); + + let record = &closed_after[0]; + assert_eq!(record.channel_id, closed_channel_before.channel_id); + assert_eq!(record.user_channel_id, closed_channel_before.user_channel_id); + assert_eq!(record.channel_capacity_sats, closed_channel_before.channel_capacity_sats); + assert_eq!(record.funding_txo, closed_channel_before.funding_txo); + assert_eq!(record.counterparty_node_id, closed_channel_before.counterparty_node_id); + assert_eq!(record.is_outbound, closed_channel_before.is_outbound); + assert_eq!(record.is_announced, closed_channel_before.is_announced); + assert_eq!(record.closed_at, closed_channel_before.closed_at); + assert_eq!(record.closure_reason, closed_channel_before.closure_reason); + + restarted_node_a.stop().unwrap(); +} From 34558a4826cab33497ef9b5ea36001af16809d9e Mon Sep 17 00:00:00 2001 From: Fernando Ledesma Date: Wed, 13 May 2026 13:20:02 -0500 Subject: [PATCH 2/6] Make counterparty_node_id non-optional on ChannelReady and ChannelClosed event We no longer need to maintain backwards compatibility with LDK Node v0.1.0, so we can make this a required field and avoid propagating the Option through the API. --- CHANGELOG.md | 5 +++++ src/event.rs | 28 ++++++++++++++++------------ tests/common/mod.rs | 5 ++--- tests/reorg_test.rs | 6 +++--- 4 files changed, 26 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9f15e61f..025bfaa32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ - Users of the VSS storage backend must upgrade their VSS server to at least version `v0.1.0-alpha.0` before upgrading LDK Node. +## Serialization Compatibility +- The `counterparty_node_id` field of the `ChannelReady` and `ChannelClosed` events is now + required. Events persisted by LDK Node v0.1.0 and prior that are missing this field will + fail to deserialize. + # 0.7.0 - Dec. 3, 2025 This seventh minor release introduces numerous new features, bug fixes, and API improvements. In particular, it adds support for channel Splicing, Async Payments, as well as sourcing chain data from a Bitcoin Core REST backend. diff --git a/src/event.rs b/src/event.rs index 8722d3de6..bff5e65b4 100644 --- a/src/event.rs +++ b/src/event.rs @@ -249,9 +249,7 @@ pub enum Event { /// The `user_channel_id` of the channel. user_channel_id: UserChannelId, /// The `node_id` of the channel counterparty. - /// - /// This will be `None` for events serialized by LDK Node v0.1.0 and prior. - counterparty_node_id: Option, + counterparty_node_id: PublicKey, /// The outpoint of the channel's funding transaction. /// /// This represents the channel's current funding output, which may change when the @@ -268,9 +266,7 @@ pub enum Event { /// The `user_channel_id` of the channel. user_channel_id: UserChannelId, /// The `node_id` of the channel counterparty. - /// - /// This will be `None` for events serialized by LDK Node v0.1.0 and prior. - counterparty_node_id: Option, + counterparty_node_id: PublicKey, /// This will be `None` for events serialized by LDK Node v0.2.1 and prior. reason: Option, /// The channel's capacity in satoshis. @@ -330,7 +326,7 @@ impl_writeable_tlv_based_enum!(Event, }, (3, ChannelReady) => { (0, channel_id, required), - (1, counterparty_node_id, option), + (1, counterparty_node_id, required), (2, user_channel_id, required), (3, funding_txo, option), }, @@ -343,7 +339,7 @@ impl_writeable_tlv_based_enum!(Event, }, (5, ChannelClosed) => { (0, channel_id, required), - (1, counterparty_node_id, option), + (1, counterparty_node_id, required), (2, user_channel_id, required), (3, reason, upgradable_option), (5, channel_capacity_sats, option), @@ -1652,7 +1648,7 @@ where let event = Event::ChannelReady { channel_id, user_channel_id: UserChannelId(user_channel_id), - counterparty_node_id: Some(counterparty_node_id), + counterparty_node_id, funding_txo, }; match self.event_queue.add_event(event).await { @@ -1719,7 +1715,8 @@ where let event = Event::ChannelClosed { channel_id, user_channel_id, - counterparty_node_id, + counterparty_node_id: counterparty_node_id + .expect("counterparty_node_id must be set for closed channels"), reason: Some(reason), channel_capacity_sats, channel_funding_txo: funding_txo, @@ -2020,6 +2017,7 @@ mod tests { use std::sync::atomic::{AtomicU16, Ordering}; use std::time::Duration; + use bitcoin::secp256k1::{Secp256k1, SecretKey}; use lightning::util::test_utils::TestLogger; use super::*; @@ -2083,10 +2081,13 @@ mod tests { let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger))); assert_eq!(event_queue.next_event(), None); + let secp = Secp256k1::new(); + let counterparty_node_id = + PublicKey::from_secret_key(&secp, &SecretKey::from_slice(&[1u8; 32]).unwrap()); let expected_event = Event::ChannelReady { channel_id: ChannelId([23u8; 32]), user_channel_id: UserChannelId(2323), - counterparty_node_id: None, + counterparty_node_id, funding_txo: None, }; event_queue.add_event(expected_event.clone()).await.unwrap(); @@ -2121,10 +2122,13 @@ mod tests { let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger))); assert_eq!(event_queue.next_event(), None); + let secp = Secp256k1::new(); + let counterparty_node_id = + PublicKey::from_secret_key(&secp, &SecretKey::from_slice(&[1u8; 32]).unwrap()); let expected_event = Event::ChannelReady { channel_id: ChannelId([23u8; 32]), user_channel_id: UserChannelId(2323), - counterparty_node_id: None, + counterparty_node_id, funding_txo: None, }; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a78a0501d..13c2c0940 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -133,7 +133,7 @@ macro_rules! expect_channel_ready_event { match event { ref e @ Event::ChannelReady { user_channel_id, counterparty_node_id, .. } => { println!("{} got event {:?}", $node.node_id(), e); - assert_eq!(counterparty_node_id, Some($counterparty_node_id)); + assert_eq!(counterparty_node_id, $counterparty_node_id); $node.event_handled().unwrap(); user_channel_id }, @@ -170,8 +170,7 @@ macro_rules! expect_channel_ready_events { } } assert!( - ids.contains(&Some($counterparty_node_id_a)) - && ids.contains(&Some($counterparty_node_id_b)), + ids.contains(&$counterparty_node_id_a) && ids.contains(&$counterparty_node_id_b), "Expected ChannelReady events from {:?} and {:?}, but got {:?}", $counterparty_node_id_a, $counterparty_node_id_b, diff --git a/tests/reorg_test.rs b/tests/reorg_test.rs index 295d9fdd2..f2c3b6586 100644 --- a/tests/reorg_test.rs +++ b/tests/reorg_test.rs @@ -112,11 +112,11 @@ proptest! { let next_node = nodes.get((i + 1) % nodes.len()).unwrap(); let prev_node = nodes.get((i + nodes.len() - 1) % nodes.len()).unwrap(); - assert!(user_channels.get(&Some(next_node.node_id())) != None); - assert!(user_channels.get(&Some(prev_node.node_id())) != None); + assert!(user_channels.get(&next_node.node_id()) != None); + assert!(user_channels.get(&prev_node.node_id()) != None); let user_channel_id = - user_channels.get(&Some(next_node.node_id())).expect("Missing user channel for node"); + user_channels.get(&next_node.node_id()).expect("Missing user channel for node"); node_channels_id.insert(node.node_id(), *user_channel_id); } From cb6f32241e9bd8e36435425b0d9c83d91c810c4f Mon Sep 17 00:00:00 2001 From: Fernando Ledesma Date: Fri, 22 May 2026 11:16:26 -0500 Subject: [PATCH 3/6] Make ChannelClosed persistence idempotent and retry on failure Fall back to the already-persisted record for is_outbound/is_announced when in-memory sets are empty on replay, use insert_or_update to avoid overwriting correct values, and propagate persist failures as ReplayEvent. --- src/event.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/event.rs b/src/event.rs index bff5e65b4..16efc7324 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1671,17 +1671,27 @@ where log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason); let user_channel_id = UserChannelId(user_channel_id); - let is_outbound = self + let is_outbound_from_set = self .outbound_channel_ids .lock() .expect("Lock poisoned") .remove(&user_channel_id); - let is_announced = self + let is_announced_from_set = self .announced_channel_ids .lock() .expect("Lock poisoned") .remove(&user_channel_id); + // On replay (after a restart or after handle_event returns ReplayEvent), + // the channel is no longer in list_channels() and the in-memory sets are + // not repopulated for it, so .remove() returns false. Fall back to any + // already-persisted record so we don't overwrite correct values with false. + let (is_outbound, is_announced) = self + .closed_channel_store + .get(&user_channel_id) + .map(|existing| (existing.is_outbound, existing.is_announced)) + .unwrap_or((is_outbound_from_set, is_announced_from_set)); + let closed_at = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or(Duration::ZERO) @@ -1703,19 +1713,22 @@ where closed_at, }; - if let Err(e) = self.closed_channel_store.insert(record).await { + if let Err(e) = self.closed_channel_store.insert_or_update(record).await { log_error!( self.logger, "Failed to persist closed channel {}: {}", channel_id, e ); + return Err(ReplayEvent()); } let event = Event::ChannelClosed { channel_id, user_channel_id, counterparty_node_id: counterparty_node_id + // Since LDK Node v0.2 this is expected to always be set. See + // CHANGELOG.md for details on the serialization compatibility break. .expect("counterparty_node_id must be set for closed channels"), reason: Some(reason), channel_capacity_sats, From 4f91d27c888bfc2791be59c1453ff2fb508ace8a Mon Sep 17 00:00:00 2001 From: Fernando Ledesma Date: Fri, 22 May 2026 11:17:47 -0500 Subject: [PATCH 4/6] Document when optional fields of ClosedChannelDetails can be None --- src/closed_channel.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/closed_channel.rs b/src/closed_channel.rs index adbcf855b..79816eadf 100644 --- a/src/closed_channel.rs +++ b/src/closed_channel.rs @@ -30,12 +30,21 @@ pub struct ClosedChannelDetails { /// The local identifier of the channel. pub user_channel_id: UserChannelId, /// The node ID of the channel's counterparty. + /// + /// Will be `None` if the channel was closed before the counterparty's node ID could be + /// determined (e.g., very early in the channel negotiation process). pub counterparty_node_id: Option, /// The channel's funding transaction outpoint. + /// + /// Will be `None` if the channel was closed before a funding transaction was established. pub funding_txo: Option, /// The channel's capacity in satoshis. + /// + /// Will be `None` if the channel was closed before the capacity was known. pub channel_capacity_sats: Option, /// Our local balance in millisatoshis at the time of channel closure. + /// + /// Will be `None` if the local balance was not available at the time of closure. pub last_local_balance_msat: Option, /// Indicates whether we initiated the channel opening. /// @@ -47,6 +56,9 @@ pub struct ClosedChannelDetails { /// This will be `false` for channels opened prior to this field being tracked. pub is_announced: bool, /// The reason for the channel closure. + /// + /// Will be `None` if the closure reason could not be decoded, e.g., if it was written by a + /// future version of LDK Node using a closure reason variant not yet known to this version. pub closure_reason: Option, /// The timestamp, in seconds since start of the UNIX epoch, when the channel was closed. pub closed_at: u64, From 37edfdb28eef8cb1627da50b2bece156f9f91c83 Mon Sep 17 00:00:00 2001 From: Fernando Ledesma Date: Fri, 12 Jun 2026 11:57:34 -0500 Subject: [PATCH 5/6] fixup! Add persistent closed channel history and `list_closed_channels()` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the in-memory set + `insert_or_update` fallback approach with a fully durable solution for tracking `is_outbound`/`is_announced` flags. The previous approach stored these flags in ephemeral `HashSet`s that were seeded from `list_channels()` at startup and consumed on `ChannelClosed`. A fallback to any existing `ClosedChannelDetails` record was added to handle `ReplayEvent`, but a gap remained: if `insert_or_update` failed (returning `ReplayEvent`) and the node restarted before the retry, the in-memory sets would be empty (closed channels don't appear in `list_channels()`) and no persisted record would exist yet, causing both flags to silently default to `falsee`. Fix this by persisting a `PendingChannelInfo` record (containing `is_outbound` and `is_announced`) to the KV store at `ChannelPending` time under a new `pending_channels/` namespace. The `ChannelClosed` handler now resolves the flags with the following priority: 1. `pending_channel_store` — durable, survives restarts and replays 2. In-memory sets — covers channels opened before this version 3. Existing `ClosedChannelDetails` record — idempotency guard The `PendingChannelInfo` record is deleted after `event_queue.add_event` succeeds. It is intentionally kept alive until that point so that any replay of `ChannelClosed` (e.g. due to a failed `insert_or_update` or `add_event`) still finds the correct flags in the store. --- src/builder.rs | 78 +++++++++++++++++++++++++------------ src/closed_channel.rs | 42 ++++++++++++++++++++ src/event.rs | 89 ++++++++++++++++++++++++++++++++++--------- src/io/mod.rs | 4 ++ src/lib.rs | 6 ++- src/types.rs | 4 +- 6 files changed, 176 insertions(+), 47 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 87f297dc2..80c92c502 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -67,6 +67,8 @@ use crate::io::{ self, CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -81,7 +83,7 @@ use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ AsyncPersister, ChainMonitor, ChannelManager, ClosedChannelStore, DynStore, DynStoreRef, DynStoreWrapper, GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, - PaymentStore, PeerManager, PendingPaymentStore, + PaymentStore, PeerManager, PendingChannelStore, PendingPaymentStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1381,30 +1383,41 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res, closed_channel_store_res) = - runtime.block_on(async move { - tokio::join!( - read_all_objects( - &*kv_store_ref, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - Arc::clone(&logger_ref), - ), - read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), - read_all_objects( - &*kv_store_ref, - PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - Arc::clone(&logger_ref), - ), - read_all_objects( - &*kv_store_ref, - CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - Arc::clone(&logger_ref), - ) - ) - }); + let ( + payment_store_res, + node_metris_res, + pending_payment_store_res, + closed_channel_store_res, + pending_channel_store_res, + ) = runtime.block_on(async move { + tokio::join!( + read_all_objects( + &*kv_store_ref, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + read_all_objects( + &*kv_store_ref, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + ) + }); // Initialize the status fields. let node_metrics = match node_metris_res { @@ -1627,6 +1640,20 @@ fn build_with_store_internal( }, }; + let pending_channel_store = match pending_channel_store_res { + Ok(pending_channels) => Arc::new(PendingChannelStore::new( + pending_channels, + PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read pending channel data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let wallet = Arc::new(Wallet::new( bdk_wallet, wallet_persister, @@ -2172,6 +2199,7 @@ fn build_with_store_internal( peer_store, payment_store, closed_channel_store, + pending_channel_store, lnurl_auth, is_running, node_metrics, diff --git a/src/closed_channel.rs b/src/closed_channel.rs index 79816eadf..d65142820 100644 --- a/src/closed_channel.rs +++ b/src/closed_channel.rs @@ -77,6 +77,48 @@ impl_writeable_tlv_based!(ClosedChannelDetails, { (18, is_announced, required), }); +/// Channel flags persisted at channel-pending time so they remain accessible when the channel +/// closes, even after a restart or when `handle_event` returns [`ReplayEvent`]. +/// +/// [`ReplayEvent`]: lightning::events::ReplayEvent +#[derive(Clone, Debug)] +pub(crate) struct PendingChannelInfo { + pub user_channel_id: UserChannelId, + pub is_outbound: bool, + pub is_announced: bool, +} + +impl_writeable_tlv_based!(PendingChannelInfo, { + (0, user_channel_id, required), + (2, is_outbound, required), + (4, is_announced, required), +}); + +pub(crate) struct PendingChannelInfoUpdate(pub UserChannelId); + +impl StorableObjectUpdate for PendingChannelInfoUpdate { + fn id(&self) -> UserChannelId { + self.0 + } +} + +impl StorableObject for PendingChannelInfo { + type Id = UserChannelId; + type Update = PendingChannelInfoUpdate; + + fn id(&self) -> UserChannelId { + self.user_channel_id + } + + fn update(&mut self, _update: Self::Update) -> bool { + false + } + + fn to_update(&self) -> Self::Update { + PendingChannelInfoUpdate(self.user_channel_id) + } +} + pub(crate) struct ClosedChannelDetailsUpdate(pub UserChannelId); impl StorableObjectUpdate for ClosedChannelDetailsUpdate { diff --git a/src/event.rs b/src/event.rs index 16efc7324..67d72fbe5 100644 --- a/src/event.rs +++ b/src/event.rs @@ -34,7 +34,7 @@ use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; -use crate::closed_channel::ClosedChannelDetails; +use crate::closed_channel::{ClosedChannelDetails, PendingChannelInfo}; use crate::config::{may_announce_channel, Config}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; @@ -56,7 +56,7 @@ use crate::payment::PaymentMetadata; use crate::runtime::Runtime; use crate::types::{ ClosedChannelStore, CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, - Sweeper, Wallet, + PendingChannelStore, Sweeper, Wallet, }; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, @@ -551,6 +551,7 @@ where payment_store: Arc, peer_store: Arc>, closed_channel_store: Arc, + pending_channel_store: Arc, // Tracks which user_channel_ids correspond to outbound channels. Populated at startup from // list_channels() and updated on ChannelPending events. Consumed on ChannelClosed events. outbound_channel_ids: Mutex>, @@ -577,9 +578,10 @@ where output_sweeper: Arc, network_graph: Arc, liquidity_source: Arc>>, payment_store: Arc, peer_store: Arc>, closed_channel_store: Arc, - keys_manager: Arc, static_invoice_store: Option, - onion_messenger: Arc, om_mailbox: Option>, - runtime: Arc, logger: L, config: Arc, + pending_channel_store: Arc, keys_manager: Arc, + static_invoice_store: Option, onion_messenger: Arc, + om_mailbox: Option>, runtime: Arc, logger: L, + config: Arc, ) -> Self { // Seed outbound_channel_ids and announced_channel_ids from currently open channels so we // correctly classify channels that were already open when this node started. @@ -609,6 +611,7 @@ where payment_store, peer_store, closed_channel_store, + pending_channel_store, outbound_channel_ids, announced_channel_ids, keys_manager, @@ -1386,12 +1389,23 @@ where 100 ); } + // For LSPS2 JIT channels (channel_override_config is Some iff the counterparty + // is our configured LSP), accept with ZeroConfZeroReserve so the LSP is not + // forced to keep 1000 sats locked as reserve. Without this, the hard + // MIN_THEIR_CHAN_RESERVE_SATOSHIS = 1000 floor in LDK reduces the usable + // outbound capacity enough that the initial HTLC forward fails on small channels. + let is_lsps2_channel = channel_override_config.is_some(); let res = if allow_0conf { + let trusted_features = if is_lsps2_channel { + TrustedChannelFeatures::ZeroConfZeroReserve + } else { + TrustedChannelFeatures::ZeroConf + }; self.channel_manager.accept_inbound_channel_from_trusted_peer( &temporary_channel_id, &counterparty_node_id, user_channel_id, - TrustedChannelFeatures::ZeroConf, + trusted_features, channel_override_config, ) } else { @@ -1567,13 +1581,13 @@ where }, }; - let peer_to_store = { + let (pending_info_opt, peer_to_store) = { let network_graph = self.network_graph.read_only(); let channels = self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); let pending_channel = channels.into_iter().find(|c| c.channel_id == channel_id); - if let Some(ref ch) = pending_channel { + let pending_info_opt = if let Some(ref ch) = pending_channel { if ch.is_outbound { self.outbound_channel_ids .lock() @@ -1586,9 +1600,16 @@ where .expect("Lock poisoned") .insert(UserChannelId(user_channel_id)); } - } + Some(PendingChannelInfo { + user_channel_id: UserChannelId(user_channel_id), + is_outbound: ch.is_outbound, + is_announced: ch.is_announced, + }) + } else { + None + }; - pending_channel + let peer_to_store = pending_channel .filter(|ch| { !ch.is_outbound && self.peer_store.get_peer(&counterparty_node_id).is_none() @@ -1603,8 +1624,23 @@ where node_id: counterparty_node_id, address: address.clone(), }) - }) - }; + }); + + (pending_info_opt, peer_to_store) + }; // network_graph is dropped here, before any await + + if let Some(pending_info) = pending_info_opt { + if let Err(e) = self.pending_channel_store.insert_or_update(pending_info).await + { + log_error!( + self.logger, + "Failed to persist pending channel info {}: {}", + channel_id, + e + ); + return Err(ReplayEvent()); + } + } if let Some(peer) = peer_to_store { self.peer_store.add_peer(peer).await.unwrap_or_else(|e| { log_error!( @@ -1682,14 +1718,20 @@ where .expect("Lock poisoned") .remove(&user_channel_id); - // On replay (after a restart or after handle_event returns ReplayEvent), - // the channel is no longer in list_channels() and the in-memory sets are - // not repopulated for it, so .remove() returns false. Fall back to any - // already-persisted record so we don't overwrite correct values with false. + // Primary: use the durably-persisted PendingChannelInfo written at + // ChannelPending time. Falls back to in-memory sets (populated at startup + // or on ChannelPending), then to any already-persisted ClosedChannelDetails + // record (for the replay case where insert_or_update already succeeded but + // add_event failed and PendingChannelInfo was already cleaned up). let (is_outbound, is_announced) = self - .closed_channel_store + .pending_channel_store .get(&user_channel_id) - .map(|existing| (existing.is_outbound, existing.is_announced)) + .map(|info| (info.is_outbound, info.is_announced)) + .or_else(|| { + self.closed_channel_store + .get(&user_channel_id) + .map(|existing| (existing.is_outbound, existing.is_announced)) + }) .unwrap_or((is_outbound_from_set, is_announced_from_set)); let closed_at = SystemTime::now() @@ -1737,7 +1779,16 @@ where }; match self.event_queue.add_event(event).await { - Ok(_) => {}, + Ok(_) => { + if let Err(e) = self.pending_channel_store.remove(&user_channel_id).await { + log_error!( + self.logger, + "Failed to remove pending channel info for {}: {}", + channel_id, + e + ); + } + }, Err(e) => { log_error!(self.logger, "Failed to push to event queue: {}", e); return Err(ReplayEvent()); diff --git a/src/io/mod.rs b/src/io/mod.rs index c36e0cd0b..eddadf084 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -33,6 +33,10 @@ pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; pub(crate) const CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "closed_channels"; pub(crate) const CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The pending channel information will be persisted under this prefix. +pub(crate) const PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_channels"; +pub(crate) const PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// The node metrics will be persisted under this key. pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/lib.rs b/src/lib.rs index 98fd99238..e9203ae40 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -178,8 +178,8 @@ use runtime::Runtime; pub use tokio; use types::{ Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, ClosedChannelStore, - DynStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, - Scorer, Sweeper, Wallet, + DynStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, + PendingChannelStore, Router, Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId}; pub use vss_client; @@ -245,6 +245,7 @@ pub struct Node { peer_store: Arc>>, payment_store: Arc, closed_channel_store: Arc, + pending_channel_store: Arc, lnurl_auth: Arc, is_running: Arc>, node_metrics: Arc, @@ -608,6 +609,7 @@ impl Node { Arc::clone(&self.payment_store), Arc::clone(&self.peer_store), Arc::clone(&self.closed_channel_store), + Arc::clone(&self.pending_channel_store), Arc::clone(&self.keys_manager), static_invoice_store, Arc::clone(&self.onion_messenger), diff --git a/src/types.rs b/src/types.rs index 538060997..0f9faa110 100644 --- a/src/types.rs +++ b/src/types.rs @@ -38,7 +38,7 @@ use lightning_net_tokio::SocketDescriptor; use crate::chain::bitcoind::UtxoSourceClient; use crate::chain::ChainSource; -use crate::closed_channel::ClosedChannelDetails; +use crate::closed_channel::{ClosedChannelDetails, PendingChannelInfo}; use crate::config::ChannelConfig; use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; @@ -631,3 +631,5 @@ impl From<&(u64, Vec)> for CustomTlvRecord { pub(crate) type PendingPaymentStore = DataStore>; pub(crate) type ClosedChannelStore = DataStore>; + +pub(crate) type PendingChannelStore = DataStore>; From 1ff7639069a7c85779e5ac06e0e9ad12becb9c0f Mon Sep 17 00:00:00 2001 From: Fernando Ledesma Date: Fri, 19 Jun 2026 12:04:49 -0500 Subject: [PATCH 6/6] fixup! Add persistent closed channel history and `list_closed_channels()` Generalize PendingChannelInfo into ChannelRecord::Funded. Rather than shipping two separate per-channel stores, introduce a ChannelRecord enum with a single Funded variant that already carries the fields the splice PR will need (counterparty_node_id, channel_id) in addition to is_outbound/is_announced flags. This lets the splice PR add pending_splice: Option as a new optional TLV field without touching the store infrastructure. --- src/builder.rs | 32 ++++---- src/channel/mod.rs | 10 +++ src/channel/store.rs | 130 +++++++++++++++++++++++++++++++ src/closed_channel.rs | 62 +-------------- src/event.rs | 65 +++++++++------- src/io/mod.rs | 6 +- src/lib.rs | 11 +-- src/types.rs | 5 +- tests/common/mod.rs | 4 +- tests/integration_tests_rust.rs | 4 +- tests/upgrade_downgrade_tests.rs | 2 +- 11 files changed, 213 insertions(+), 118 deletions(-) create mode 100644 src/channel/mod.rs create mode 100644 src/channel/store.rs diff --git a/src/builder.rs b/src/builder.rs index 80c92c502..fdf1ef8fd 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -64,11 +64,11 @@ use crate::io::utils::{ }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + self, CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE, + CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -81,9 +81,9 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, ClosedChannelStore, DynStore, DynStoreRef, - DynStoreWrapper, GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, - PaymentStore, PeerManager, PendingChannelStore, PendingPaymentStore, + AsyncPersister, ChainMonitor, ChannelManager, ChannelRecordStore, ClosedChannelStore, DynStore, + DynStoreRef, DynStoreWrapper, GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, + OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1388,7 +1388,7 @@ fn build_with_store_internal( node_metris_res, pending_payment_store_res, closed_channel_store_res, - pending_channel_store_res, + channel_record_store_res, ) = runtime.block_on(async move { tokio::join!( read_all_objects( @@ -1412,8 +1412,8 @@ fn build_with_store_internal( ), read_all_objects( &*kv_store_ref, - PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE, Arc::clone(&logger_ref), ), ) @@ -1640,16 +1640,16 @@ fn build_with_store_internal( }, }; - let pending_channel_store = match pending_channel_store_res { - Ok(pending_channels) => Arc::new(PendingChannelStore::new( - pending_channels, - PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), - PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + let channel_record_store = match channel_record_store_res { + Ok(channel_records) => Arc::new(ChannelRecordStore::new( + channel_records, + CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), Arc::clone(&kv_store), Arc::clone(&logger), )), Err(e) => { - log_error!(logger, "Failed to read pending channel data from store: {}", e); + log_error!(logger, "Failed to read channel record data from store: {}", e); return Err(BuildError::ReadFailed); }, }; @@ -2199,7 +2199,7 @@ fn build_with_store_internal( peer_store, payment_store, closed_channel_store, - pending_channel_store, + channel_record_store, lnurl_auth, is_running, node_metrics, diff --git a/src/channel/mod.rs b/src/channel/mod.rs new file mode 100644 index 000000000..95a9f5224 --- /dev/null +++ b/src/channel/mod.rs @@ -0,0 +1,10 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Per-channel state tracking. + +pub(crate) mod store; diff --git a/src/channel/store.rs b/src/channel/store.rs new file mode 100644 index 000000000..0d2969218 --- /dev/null +++ b/src/channel/store.rs @@ -0,0 +1,130 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use bitcoin::secp256k1::PublicKey; +use lightning::impl_writeable_tlv_based_enum; +use lightning::ln::types::ChannelId; + +use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; +use crate::hex_utils; +use crate::types::UserChannelId; + +/// Persistent per-channel state tracked by LDK Node, keyed by [`UserChannelId`]. +/// +/// Durably stores channel flags at `ChannelPending` time so they remain accessible when the +/// channel closes, even after a restart or a [`ReplayEvent`]. The `Funded` variant is designed +/// to be extended with a `pending_splice` field in a future PR to support splice retry across +/// restarts and peer disconnects. +/// +/// [`ReplayEvent`]: lightning::events::ReplayEvent +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum ChannelRecord { + /// State for a live channel whose funding transaction exists. + Funded { + user_channel_id: UserChannelId, + /// The node ID of the channel counterparty. + counterparty_node_id: PublicKey, + /// The channel's ID at the time the `ChannelPending` event fired. + channel_id: ChannelId, + /// Whether we opened the channel (outbound) or the counterparty did (inbound). + is_outbound: bool, + /// Whether the channel was publicly announced. + is_announced: bool, + }, +} + +impl_writeable_tlv_based_enum!(ChannelRecord, + (0, Funded) => { + (0, user_channel_id, required), + (2, counterparty_node_id, required), + (4, channel_id, required), + (6, is_outbound, required), + (8, is_announced, required), + }, +); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelRecordUpdate { + pub user_channel_id: UserChannelId, +} + +impl StorableObjectUpdate for ChannelRecordUpdate { + fn id(&self) -> UserChannelId { + self.user_channel_id + } +} + +impl StorableObject for ChannelRecord { + type Id = UserChannelId; + type Update = ChannelRecordUpdate; + + fn id(&self) -> UserChannelId { + match self { + ChannelRecord::Funded { user_channel_id, .. } => *user_channel_id, + } + } + + fn update(&mut self, _update: Self::Update) -> bool { + // ChannelRecord fields are immutable once written in this version. Returning false + // makes insert_or_update a no-op when the record already exists, ensuring idempotency + // on ChannelPending replay. + false + } + + fn to_update(&self) -> Self::Update { + ChannelRecordUpdate { user_channel_id: self.id() } + } +} + +impl StorableObjectId for UserChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0.to_be_bytes()) + } +} + +#[cfg(test)] +mod tests { + use lightning::ln::types::ChannelId; + use lightning::util::ser::{Readable, Writeable}; + + use super::*; + + fn make_record(is_outbound: bool, is_announced: bool) -> ChannelRecord { + let user_channel_id = UserChannelId(42); + // A valid compressed public key: prefix 0x02 followed by 32 bytes. + let counterparty_node_id = PublicKey::from_slice(&[2u8; 33]).expect("valid pubkey"); + let channel_id = ChannelId([3u8; 32]); + ChannelRecord::Funded { + user_channel_id, + counterparty_node_id, + channel_id, + is_outbound, + is_announced, + } + } + + #[test] + fn channel_record_roundtrips() { + for (is_outbound, is_announced) in + [(true, false), (false, true), (true, true), (false, false)] + { + let record = make_record(is_outbound, is_announced); + let encoded = record.encode(); + let decoded = ChannelRecord::read(&mut &encoded[..]).expect("decode succeeds"); + assert_eq!(record, decoded); + assert_eq!(decoded.id(), UserChannelId(42)); + assert!(matches!( + decoded, + ChannelRecord::Funded { + is_outbound: dec_out, + is_announced: dec_ann, + .. + } if dec_out == is_outbound && dec_ann == is_announced + )); + } + } +} diff --git a/src/closed_channel.rs b/src/closed_channel.rs index d65142820..0bd00665e 100644 --- a/src/closed_channel.rs +++ b/src/closed_channel.rs @@ -5,16 +5,13 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - use bitcoin::secp256k1::PublicKey; use bitcoin::OutPoint; use lightning::events::ClosureReason; use lightning::impl_writeable_tlv_based; use lightning::ln::types::ChannelId; -use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; -use crate::hex_utils; +use crate::data_store::{StorableObject, StorableObjectUpdate}; use crate::types::UserChannelId; /// Details of a closed channel. @@ -30,10 +27,7 @@ pub struct ClosedChannelDetails { /// The local identifier of the channel. pub user_channel_id: UserChannelId, /// The node ID of the channel's counterparty. - /// - /// Will be `None` if the channel was closed before the counterparty's node ID could be - /// determined (e.g., very early in the channel negotiation process). - pub counterparty_node_id: Option, + pub counterparty_node_id: PublicKey, /// The channel's funding transaction outpoint. /// /// Will be `None` if the channel was closed before a funding transaction was established. @@ -67,58 +61,16 @@ pub struct ClosedChannelDetails { impl_writeable_tlv_based!(ClosedChannelDetails, { (0, channel_id, required), (2, user_channel_id, required), - (4, counterparty_node_id, option), + (4, counterparty_node_id, required), (6, funding_txo, option), (8, channel_capacity_sats, option), (10, last_local_balance_msat, option), (12, is_outbound, required), (14, closure_reason, upgradable_option), - (16, closed_at, (default_value, SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::from_secs(0)).as_secs())), + (16, closed_at, required), (18, is_announced, required), }); -/// Channel flags persisted at channel-pending time so they remain accessible when the channel -/// closes, even after a restart or when `handle_event` returns [`ReplayEvent`]. -/// -/// [`ReplayEvent`]: lightning::events::ReplayEvent -#[derive(Clone, Debug)] -pub(crate) struct PendingChannelInfo { - pub user_channel_id: UserChannelId, - pub is_outbound: bool, - pub is_announced: bool, -} - -impl_writeable_tlv_based!(PendingChannelInfo, { - (0, user_channel_id, required), - (2, is_outbound, required), - (4, is_announced, required), -}); - -pub(crate) struct PendingChannelInfoUpdate(pub UserChannelId); - -impl StorableObjectUpdate for PendingChannelInfoUpdate { - fn id(&self) -> UserChannelId { - self.0 - } -} - -impl StorableObject for PendingChannelInfo { - type Id = UserChannelId; - type Update = PendingChannelInfoUpdate; - - fn id(&self) -> UserChannelId { - self.user_channel_id - } - - fn update(&mut self, _update: Self::Update) -> bool { - false - } - - fn to_update(&self) -> Self::Update { - PendingChannelInfoUpdate(self.user_channel_id) - } -} - pub(crate) struct ClosedChannelDetailsUpdate(pub UserChannelId); impl StorableObjectUpdate for ClosedChannelDetailsUpdate { @@ -144,9 +96,3 @@ impl StorableObject for ClosedChannelDetails { ClosedChannelDetailsUpdate(self.user_channel_id) } } - -impl StorableObjectId for UserChannelId { - fn encode_to_hex_str(&self) -> String { - hex_utils::to_string(&self.0.to_be_bytes()) - } -} diff --git a/src/event.rs b/src/event.rs index 67d72fbe5..dd27cf6eb 100644 --- a/src/event.rs +++ b/src/event.rs @@ -34,7 +34,8 @@ use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; -use crate::closed_channel::{ClosedChannelDetails, PendingChannelInfo}; +use crate::channel::store::ChannelRecord; +use crate::closed_channel::ClosedChannelDetails; use crate::config::{may_announce_channel, Config}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; @@ -55,8 +56,8 @@ use crate::payment::store::{ use crate::payment::PaymentMetadata; use crate::runtime::Runtime; use crate::types::{ - ClosedChannelStore, CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, - PendingChannelStore, Sweeper, Wallet, + ChannelRecordStore, ClosedChannelStore, CustomTlvRecord, DynStore, KeysManager, OnionMessenger, + PaymentStore, Sweeper, Wallet, }; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, @@ -551,7 +552,7 @@ where payment_store: Arc, peer_store: Arc>, closed_channel_store: Arc, - pending_channel_store: Arc, + channel_record_store: Arc, // Tracks which user_channel_ids correspond to outbound channels. Populated at startup from // list_channels() and updated on ChannelPending events. Consumed on ChannelClosed events. outbound_channel_ids: Mutex>, @@ -578,7 +579,7 @@ where output_sweeper: Arc, network_graph: Arc, liquidity_source: Arc>>, payment_store: Arc, peer_store: Arc>, closed_channel_store: Arc, - pending_channel_store: Arc, keys_manager: Arc, + channel_record_store: Arc, keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, runtime: Arc, logger: L, config: Arc, @@ -611,7 +612,7 @@ where payment_store, peer_store, closed_channel_store, - pending_channel_store, + channel_record_store, outbound_channel_ids, announced_channel_ids, keys_manager, @@ -1581,13 +1582,13 @@ where }, }; - let (pending_info_opt, peer_to_store) = { + let (record_opt, peer_to_store) = { let network_graph = self.network_graph.read_only(); let channels = self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); let pending_channel = channels.into_iter().find(|c| c.channel_id == channel_id); - let pending_info_opt = if let Some(ref ch) = pending_channel { + let record_opt = if let Some(ref ch) = pending_channel { if ch.is_outbound { self.outbound_channel_ids .lock() @@ -1600,8 +1601,10 @@ where .expect("Lock poisoned") .insert(UserChannelId(user_channel_id)); } - Some(PendingChannelInfo { + Some(ChannelRecord::Funded { user_channel_id: UserChannelId(user_channel_id), + counterparty_node_id, + channel_id, is_outbound: ch.is_outbound, is_announced: ch.is_announced, }) @@ -1626,15 +1629,14 @@ where }) }); - (pending_info_opt, peer_to_store) + (record_opt, peer_to_store) }; // network_graph is dropped here, before any await - if let Some(pending_info) = pending_info_opt { - if let Err(e) = self.pending_channel_store.insert_or_update(pending_info).await - { + if let Some(record) = record_opt { + if let Err(e) = self.channel_record_store.insert_or_update(record).await { log_error!( self.logger, - "Failed to persist pending channel info {}: {}", + "Failed to persist channel record for {}: {}", channel_id, e ); @@ -1718,15 +1720,19 @@ where .expect("Lock poisoned") .remove(&user_channel_id); - // Primary: use the durably-persisted PendingChannelInfo written at - // ChannelPending time. Falls back to in-memory sets (populated at startup - // or on ChannelPending), then to any already-persisted ClosedChannelDetails - // record (for the replay case where insert_or_update already succeeded but - // add_event failed and PendingChannelInfo was already cleaned up). + // Primary: use the durably-persisted ChannelRecord written at ChannelPending + // time. Falls back to in-memory sets (populated at startup or on + // ChannelPending), then to any already-persisted ClosedChannelDetails record + // (for the replay case where insert_or_update already succeeded but + // add_event failed and the ChannelRecord was already cleaned up). let (is_outbound, is_announced) = self - .pending_channel_store + .channel_record_store .get(&user_channel_id) - .map(|info| (info.is_outbound, info.is_announced)) + .and_then(|record| match record { + ChannelRecord::Funded { is_outbound, is_announced, .. } => { + Some((is_outbound, is_announced)) + }, + }) .or_else(|| { self.closed_channel_store .get(&user_channel_id) @@ -1739,8 +1745,12 @@ where .unwrap_or(Duration::ZERO) .as_secs(); - let funding_txo = - channel_funding_txo.map(|op| OutPoint { txid: op.txid, vout: op.index as u32 }); + let funding_txo = channel_funding_txo.map(|op| op.into_bitcoin_outpoint()); + + // Since LDK Node v0.2 this is expected to always be set. See + // CHANGELOG.md for details on the serialization compatibility break. + let counterparty_node_id = counterparty_node_id + .expect("counterparty_node_id must be set for closed channels"); let record = ClosedChannelDetails { channel_id, @@ -1768,10 +1778,7 @@ where let event = Event::ChannelClosed { channel_id, user_channel_id, - counterparty_node_id: counterparty_node_id - // Since LDK Node v0.2 this is expected to always be set. See - // CHANGELOG.md for details on the serialization compatibility break. - .expect("counterparty_node_id must be set for closed channels"), + counterparty_node_id, reason: Some(reason), channel_capacity_sats, channel_funding_txo: funding_txo, @@ -1780,10 +1787,10 @@ where match self.event_queue.add_event(event).await { Ok(_) => { - if let Err(e) = self.pending_channel_store.remove(&user_channel_id).await { + if let Err(e) = self.channel_record_store.remove(&user_channel_id).await { log_error!( self.logger, - "Failed to remove pending channel info for {}: {}", + "Failed to remove channel record for {}: {}", channel_id, e ); diff --git a/src/io/mod.rs b/src/io/mod.rs index eddadf084..b37fb22f8 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -33,9 +33,9 @@ pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; pub(crate) const CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "closed_channels"; pub(crate) const CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; -/// The pending channel information will be persisted under this prefix. -pub(crate) const PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_channels"; -pub(crate) const PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The per-channel records will be persisted under this prefix. +pub(crate) const CHANNEL_RECORD_PERSISTENCE_PRIMARY_NAMESPACE: &str = "channel_records"; +pub(crate) const CHANNEL_RECORD_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The node metrics will be persisted under this key. pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; diff --git a/src/lib.rs b/src/lib.rs index e9203ae40..c2cae5283 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ mod balance; mod builder; mod chain; +mod channel; pub(crate) mod closed_channel; pub mod config; mod connection; @@ -177,9 +178,9 @@ use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, ClosedChannelStore, - DynStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, - PendingChannelStore, Router, Scorer, Sweeper, Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, ChannelRecordStore, + ClosedChannelStore, DynStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, + PeerManager, Router, Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId}; pub use vss_client; @@ -245,7 +246,7 @@ pub struct Node { peer_store: Arc>>, payment_store: Arc, closed_channel_store: Arc, - pending_channel_store: Arc, + channel_record_store: Arc, lnurl_auth: Arc, is_running: Arc>, node_metrics: Arc, @@ -609,7 +610,7 @@ impl Node { Arc::clone(&self.payment_store), Arc::clone(&self.peer_store), Arc::clone(&self.closed_channel_store), - Arc::clone(&self.pending_channel_store), + Arc::clone(&self.channel_record_store), Arc::clone(&self.keys_manager), static_invoice_store, Arc::clone(&self.onion_messenger), diff --git a/src/types.rs b/src/types.rs index 0f9faa110..4253c5733 100644 --- a/src/types.rs +++ b/src/types.rs @@ -38,7 +38,8 @@ use lightning_net_tokio::SocketDescriptor; use crate::chain::bitcoind::UtxoSourceClient; use crate::chain::ChainSource; -use crate::closed_channel::{ClosedChannelDetails, PendingChannelInfo}; +use crate::channel::store::ChannelRecord; +use crate::closed_channel::ClosedChannelDetails; use crate::config::ChannelConfig; use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; @@ -632,4 +633,4 @@ pub(crate) type PendingPaymentStore = DataStore>; -pub(crate) type PendingChannelStore = DataStore>; +pub(crate) type ChannelRecordStore = DataStore>; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 13c2c0940..2c599de72 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1651,8 +1651,8 @@ pub(crate) async fn do_channel_full_cycle( assert!(closed_a[0].funding_txo.is_some()); assert!(closed_b[0].funding_txo.is_some()); assert_eq!(closed_a[0].funding_txo, closed_b[0].funding_txo); - assert_eq!(closed_a[0].counterparty_node_id, Some(node_b.node_id())); - assert_eq!(closed_b[0].counterparty_node_id, Some(node_a.node_id())); + assert_eq!(closed_a[0].counterparty_node_id, node_b.node_id()); + assert_eq!(closed_b[0].counterparty_node_id, node_a.node_id()); node_a.stop().unwrap(); println!("\nA stopped"); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 0ed27f81f..c7d360ec0 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -3206,7 +3206,7 @@ async fn closed_channel_history_persists_after_restart() { assert_eq!(record.channel_capacity_sats, Some(channel_amount_sat)); assert!(record.is_outbound); assert!(record.is_announced); - assert_eq!(record.counterparty_node_id, Some(node_b.node_id())); + assert_eq!(record.counterparty_node_id, node_b.node_id()); assert!(record.funding_txo.is_some()); assert_eq!(record.funding_txo.unwrap().txid, funding_txo.txid); assert!(record.closure_reason.is_some()); @@ -3219,7 +3219,7 @@ async fn closed_channel_history_persists_after_restart() { assert_eq!(record_b.channel_capacity_sats, Some(channel_amount_sat)); assert!(!record_b.is_outbound); assert!(record_b.is_announced); - assert_eq!(record_b.counterparty_node_id, Some(node_a.node_id())); + assert_eq!(record_b.counterparty_node_id, node_a.node_id()); assert!(record_b.funding_txo.is_some()); assert_eq!(record_b.funding_txo.unwrap().txid, funding_txo.txid); assert!(record_b.closure_reason.is_some()); diff --git a/tests/upgrade_downgrade_tests.rs b/tests/upgrade_downgrade_tests.rs index b30b5a33c..c3ee6868e 100644 --- a/tests/upgrade_downgrade_tests.rs +++ b/tests/upgrade_downgrade_tests.rs @@ -290,7 +290,7 @@ async fn expect_current_channel_pending( async fn expect_current_channel_ready(node: &CurrentNode, expected_counterparty: PublicKey) { match next_current_event(node).await { ldk_node::Event::ChannelReady { counterparty_node_id, .. } => { - assert_eq!(counterparty_node_id, Some(expected_counterparty)); + assert_eq!(counterparty_node_id, expected_counterparty); node.event_handled().unwrap(); }, event => panic!("{} got unexpected event: {:?}", node.node_id(), event),