Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Endorse htlc and local reputation #2716

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/release-notes/eclair-vnext.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@ Existing `static_remote_key` channels will continue to work. You can override th

Eclair will not allow remote peers to open new obsolete channels that do not support `option_static_remotekey`.

### Local reputation and HTLC endorsement

To protect against jamming attacks, eclair gives a reputation to its neighbors and uses to decide if a HTLC should be relayed given how congested is the outgoing channel.
The reputation is basically how much this node paid us in fees divided by how much they should have paid us for the liquidity and slots that they blocked.
The reputation is per incoming node and endorsement level.
The confidence that the HTLC will be fulfilled is transmitted to the next node using the endorsement TLV of the `update_add_htlc` message.

To configure, edit `eclair.conf`:
```eclair.conf
eclair.local-reputation {
# Reputation decays with the following half life to emphasize recent behavior.
half-life = 7 days
# HTLCs that stay pending for longer than this get penalized
good-htlc-duration = 12 seconds
# How much to penalize pending HLTCs. A pending HTLC is considered equivalent to this many fast-failing HTLCs.
pending-multiplier = 1000
}
```

### API changes

<insert changes>
Expand Down
12 changes: 12 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,18 @@ eclair {
// Number of blocks before the incoming HTLC expires that an async payment must be triggered by the receiver
cancel-safety-before-timeout-blocks = 144
}

// We assign reputations to our peers to prioritize HTLCs during congestion.
// The reputation is computed as fees paid divided by what should have been paid if all HTLCs were successful.
peer-reputation {
// Reputation decays with the following half life to emphasize recent behavior.
half-life = 7 days
// HTLCs that stay pending for longer than this get penalized
max-htlc-relay-duration = 12 seconds
// Pending HTLCs are counted as failed, and because they could potentially stay pending for a very long time, the
// following multiplier is applied.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is much clearer. I'm wondering how you came up with this default value though? Can you explain it and provide some guidance on what node operators should take into account when they change that default value?

pending-multiplier = 1000 // A pending HTLCs counts as a thousand failed ones.
}
}

on-chain-fees {
Expand Down
8 changes: 7 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import fr.acinq.eclair.io.MessageRelay.{RelayAll, RelayChannelsOnly, RelayPolicy
import fr.acinq.eclair.io.PeerConnection
import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
import fr.acinq.eclair.reputation.Reputation.ReputationConfig
import fr.acinq.eclair.router.Announcements.AddressException
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, WeightRatios}
import fr.acinq.eclair.router.Router._
Expand Down Expand Up @@ -561,7 +562,12 @@ object NodeParams extends Logging {
privateChannelFees = getRelayFees(config.getConfig("relay.fees.private-channels")),
minTrampolineFees = getRelayFees(config.getConfig("relay.fees.min-trampoline")),
enforcementDelay = FiniteDuration(config.getDuration("relay.fees.enforcement-delay").getSeconds, TimeUnit.SECONDS),
asyncPaymentsParams = AsyncPaymentsParams(asyncPaymentHoldTimeoutBlocks, asyncPaymentCancelSafetyBeforeTimeoutBlocks)
asyncPaymentsParams = AsyncPaymentsParams(asyncPaymentHoldTimeoutBlocks, asyncPaymentCancelSafetyBeforeTimeoutBlocks),
peerReputationConfig = ReputationConfig(
FiniteDuration(config.getDuration("relay.peer-reputation.half-life").getSeconds, TimeUnit.SECONDS),
FiniteDuration(config.getDuration("relay.peer-reputation.max-htlc-relay-duration").getSeconds, TimeUnit.SECONDS),
config.getDouble("relay.peer-reputation.pending-multiplier"),
),
),
db = database,
autoReconnect = config.getBoolean("auto-reconnect"),
Expand Down
4 changes: 3 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import fr.acinq.eclair.payment.offer.OfferManager
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.router._
import fr.acinq.eclair.tor.{Controller, TorProtocolHandler}
import fr.acinq.eclair.wire.protocol.NodeAddress
Expand Down Expand Up @@ -360,7 +361,8 @@ class Setup(val datadir: File,
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
reputationRecorder = system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig, Map.empty)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder")
Copy link
Member

@t-bast t-bast Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must make this optional and let node operator disable it entirely if they'd like to run the standard blip-0004 relay. I've done this in #2893.

relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, reputationRecorder, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
_ = relayer ! PostRestartHtlcCleaner.Init(channels)
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ case class ExpiryTooBig (override val channelId: Byte
case class HtlcValueTooSmall (override val channelId: ByteVector32, minimum: MilliSatoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"htlc value too small: minimum=$minimum actual=$actual")
case class HtlcValueTooHighInFlight (override val channelId: ByteVector32, maximum: MilliSatoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"in-flight htlcs hold too much value: maximum=$maximum actual=$actual")
case class TooManyAcceptedHtlcs (override val channelId: ByteVector32, maximum: Long) extends ChannelException(channelId, s"too many accepted htlcs: maximum=$maximum")
case class TooManySmallHtlcs (override val channelId: ByteVector32, number: Long, below: MilliSatoshi) extends ChannelException(channelId, s"too many small htlcs: $number HTLCs below $below")
case class ConfidenceTooLow (override val channelId: ByteVector32, confidence: Double, occupancy: Double) extends ChannelException(channelId, s"confidence too low: confidence=$confidence occupancy=$occupancy")
case class LocalDustHtlcExposureTooHigh (override val channelId: ByteVector32, maximum: Satoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"dust htlcs hold too much value: maximum=$maximum actual=$actual")
case class RemoteDustHtlcExposureTooHigh (override val channelId: ByteVector32, maximum: Satoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"dust htlcs hold too much value: maximum=$maximum actual=$actual")
case class InsufficientFunds (override val channelId: ByteVector32, amount: MilliSatoshi, missing: Satoshi, reserve: Satoshi, fees: Satoshi) extends ChannelException(channelId, s"insufficient funds: missing=$missing reserve=$reserve fees=$fees")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ case class Commitment(fundingTxIndex: Long,
localCommit.spec.htlcs.collect(DirectedHtlc.incoming).filter(nearlyExpired)
}

def canSendAdd(amount: MilliSatoshi, params: ChannelParams, changes: CommitmentChanges, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, Unit] = {
def canSendAdd(amount: MilliSatoshi, params: ChannelParams, changes: CommitmentChanges, feerates: FeeratesPerKw, feeConf: OnChainFeeConf, confidence: Double): Either[ChannelException, Unit] = {
// we allowed mismatches between our feerates and our remote's as long as commitments didn't contain any HTLC at risk
// we need to verify that we're not disagreeing on feerates anymore before offering new HTLCs
// NB: there may be a pending update_fee that hasn't been applied yet that needs to be taken into account
Expand Down Expand Up @@ -488,7 +488,8 @@ case class Commitment(fundingTxIndex: Long,
if (allowedHtlcValueInFlight < htlcValueInFlight) {
return Left(HtlcValueTooHighInFlight(params.channelId, maximum = allowedHtlcValueInFlight, actual = htlcValueInFlight))
}
if (Seq(params.localParams.maxAcceptedHtlcs, params.remoteParams.maxAcceptedHtlcs).min < outgoingHtlcs.size) {
val maxAcceptedHtlcs = params.localParams.maxAcceptedHtlcs.min(params.remoteParams.maxAcceptedHtlcs)
if (maxAcceptedHtlcs < outgoingHtlcs.size) {
return Left(TooManyAcceptedHtlcs(params.channelId, maximum = Seq(params.localParams.maxAcceptedHtlcs, params.remoteParams.maxAcceptedHtlcs).min))
}

Expand All @@ -505,6 +506,18 @@ case class Commitment(fundingTxIndex: Long,
return Left(RemoteDustHtlcExposureTooHigh(params.channelId, maxDustExposure, remoteDustExposureAfterAdd))
}

// Jamming protection
// Must be the last checks so that they can be ignored for shadow deployment.
for ((amountMsat, i) <- outgoingHtlcs.toSeq.map(_.amountMsat).sorted.zipWithIndex) {
if ((amountMsat.toLong < 1) || (math.log(amountMsat.toLong.toDouble) * maxAcceptedHtlcs / math.log(params.localParams.maxHtlcValueInFlightMsat.toLong.toDouble / maxAcceptedHtlcs) < i)) {
return Left(TooManySmallHtlcs(params.channelId, number = i + 1, below = amountMsat))
}
}
val occupancy = (outgoingHtlcs.size.toDouble / maxAcceptedHtlcs).max(htlcValueInFlight.toLong.toDouble / allowedHtlcValueInFlight.toLong.toDouble)
if (confidence + 0.05 < occupancy) {
return Left(ConfidenceTooLow(params.channelId, confidence, occupancy))
}

Right(())
}

Expand Down Expand Up @@ -552,6 +565,14 @@ case class Commitment(fundingTxIndex: Long,
return Left(TooManyAcceptedHtlcs(params.channelId, maximum = params.localParams.maxAcceptedHtlcs))
}

// Jamming protection
// Must be the last checks so that they can be ignored for shadow deployment.
for ((amountMsat, i) <- incomingHtlcs.toSeq.map(_.amountMsat).sorted.zipWithIndex) {
if ((amountMsat.toLong < 1) || (math.log(amountMsat.toLong.toDouble) * params.localParams.maxAcceptedHtlcs / math.log(params.localParams.maxHtlcValueInFlightMsat.toLong.toDouble / params.localParams.maxAcceptedHtlcs) < i)) {
return Left(TooManySmallHtlcs(params.channelId, number = i + 1, below = amountMsat))
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't have jamming protection in receiveAdd: an error here would trigger a force-close. This is only used when our peer violated some channel constraints they should be aware of. This isn't the case for jamming, where we apply a local (potentially dynamic) heuristic: we should accept those HTLCs and fail them instead of relaying them, but not raise an error in receiveAdd.


Right(())
}

Expand Down Expand Up @@ -835,7 +856,7 @@ case class Commitments(params: ChannelParams,
* @param cmd add HTLC command
* @return either Left(failure, error message) where failure is a failure message (see BOLT #4 and the Failure Message class) or Right(new commitments, updateAddHtlc)
*/
def sendAdd(cmd: CMD_ADD_HTLC, currentHeight: BlockHeight, channelConf: ChannelConf, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, (Commitments, UpdateAddHtlc)] = {
def sendAdd(cmd: CMD_ADD_HTLC, currentHeight: BlockHeight, channelConf: ChannelConf, feerates: FeeratesPerKw, feeConf: OnChainFeeConf)(implicit log: LoggingAdapter): Either[ChannelException, (Commitments, UpdateAddHtlc)] = {
// we must ensure we're not relaying htlcs that are already expired, otherwise the downstream channel will instantly close
// NB: we add a 3 blocks safety to reduce the probability of running into this when our bitcoin node is slightly outdated
val minExpiry = CltvExpiry(currentHeight + 3)
Expand All @@ -859,12 +880,28 @@ case class Commitments(params: ChannelParams,
val changes1 = changes.addLocalProposal(add).copy(localNextHtlcId = changes.localNextHtlcId + 1)
val originChannels1 = originChannels + (add.id -> cmd.origin)
// we verify that this htlc is allowed in every active commitment
active.map(_.canSendAdd(add.amountMsat, params, changes1, feerates, feeConf))
.collectFirst { case Left(f) => Left(f) }
val canSendAdds = active.map(_.canSendAdd(add.amountMsat, params, changes1, feerates, feeConf, cmd.confidence))
// Log only for jamming protection.
canSendAdds.collectFirst {
case Left(f: TooManySmallHtlcs) =>
log.info("TooManySmallHtlcs: {} outgoing HTLCs are below {}}", f.number, f.below)
Metrics.dropHtlc(f, Tags.Directions.Outgoing)
case Left(f: ConfidenceTooLow) =>
log.info("ConfidenceTooLow: confidence is {}% while channel is {}% full", (100 * f.confidence).toInt, (100 * f.occupancy).toInt)
Metrics.dropHtlc(f, Tags.Directions.Outgoing)
}
canSendAdds.flatMap { // TODO: We ignore jamming protection, delete this flatMap to activate jamming protection.
case Left(_: TooManySmallHtlcs) | Left(_: ConfidenceTooLow) => None
case x => Some(x)
}
.collectFirst { case Left(f) =>
Metrics.dropHtlc(f, Tags.Directions.Outgoing)
Left(f)
}
Comment on lines +875 to +892
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems overly complex: can't you keep sendAdd unchanged, and in canSendAdd instead of returning TooManySmallHtlcs or ConfidenceTooLow, simply write the log line and increment the metric?

This will duplicate the log/metric when we have pending splices, but pending splices is something that won't happen frequently enough to skew the data?

Or even better, since HTLCs are the same in all active commitments, you can remove the changes to canSendAdd and here in sendAdd, when canSendAdd returns a success, you can evaluate the jamming conditions and log and increment the metric if needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done this in #2898 however I don't like because it just duplicates a lot of canSendAdd's code. I believe that jamming checks, like the others should be done in canSendAdd. This piece of code may look overly complex but the idea is to get rid of it when we deploy jamming protection and TooManySmallHtlcs and ConfidenceTooLow will just be treated like any other error.

.getOrElse(Right(copy(changes = changes1, originChannels = originChannels1), add))
}

def receiveAdd(add: UpdateAddHtlc, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, Commitments] = {
def receiveAdd(add: UpdateAddHtlc, feerates: FeeratesPerKw, feeConf: OnChainFeeConf)(implicit log: LoggingAdapter): Either[ChannelException, Commitments] = {
if (add.id != changes.remoteNextHtlcId) {
return Left(UnexpectedHtlcId(channelId, expected = changes.remoteNextHtlcId, actual = add.id))
}
Expand All @@ -877,8 +914,21 @@ case class Commitments(params: ChannelParams,

val changes1 = changes.addRemoteProposal(add).copy(remoteNextHtlcId = changes.remoteNextHtlcId + 1)
// we verify that this htlc is allowed in every active commitment
active.map(_.canReceiveAdd(add.amountMsat, params, changes1, feerates, feeConf))
.collectFirst { case Left(f) => Left(f) }
val canReceiveAdds = active.map(_.canReceiveAdd(add.amountMsat, params, changes1, feerates, feeConf))
// Log only for jamming protection.
canReceiveAdds.collectFirst {
case Left(f: TooManySmallHtlcs) =>
log.info("TooManySmallHtlcs: {} incoming HTLCs are below {}}", f.number, f.below)
Metrics.dropHtlc(f, Tags.Directions.Incoming)
}
canReceiveAdds.flatMap { // TODO: We ignore jamming protection, delete this flatMap to activate jamming protection.
case Left(_: TooManySmallHtlcs) | Left(_: ConfidenceTooLow) => None
case x => Some(x)
}
.collectFirst { case Left(f) =>
Metrics.dropHtlc(f, Tags.Directions.Incoming)
Left(f)
}
.getOrElse(Right(copy(changes = changes1)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ object Monitoring {
val RemoteFeeratePerByte = Kamon.histogram("channels.remote-feerate-per-byte")
val Splices = Kamon.histogram("channels.splices", "Splices")
val ProcessMessage = Kamon.timer("channels.messages-processed")
val HtlcDropped = Kamon.counter("channels.htlc-dropped")

def recordHtlcsInFlight(remoteSpec: CommitmentSpec, previousRemoteSpec: CommitmentSpec): Unit = {
for (direction <- Tags.Directions.Incoming :: Tags.Directions.Outgoing :: Nil) {
Expand Down Expand Up @@ -75,6 +76,10 @@ object Monitoring {
Metrics.Splices.withTag(Tags.Origin, Tags.Origins.Remote).withTag(Tags.SpliceType, Tags.SpliceTypes.SpliceCpfp).record(Math.abs(fundingParams.remoteContribution.toLong))
}
}

def dropHtlc(reason: ChannelException, direction: String): Unit = {
HtlcDropped.withTag(Tags.Reason, reason.getClass.getSimpleName).withTag(Tags.Direction, direction).increment()
}
}

object Tags {
Expand All @@ -85,6 +90,7 @@ object Monitoring {
val State = "state"
val CommitmentFormat = "commitment-format"
val SpliceType = "splice-type"
val Reason = "reason"

object Events {
val Created = "created"
Expand Down
Loading
Loading