14 constexpr
const char *kPropagatedTransactions =
15 "kagome_sync_propagated_transactions";
26 std::shared_ptr<consensus::babe::Babe> babe,
27 std::shared_ptr<ExtrinsicObserver> extrinsic_observer,
28 std::shared_ptr<StreamEngine> stream_engine,
29 std::shared_ptr<primitives::events::ExtrinsicSubscriptionEngine>
30 extrinsic_events_engine,
31 std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
36 "PropagateTransactionsProtocol"),
38 babe_(std::move(babe)),
50 kPropagatedTransactions,
51 "Number of transactions propagated to at least one peer");
65 std::shared_ptr<Stream> stream) {
66 BOOST_ASSERT(stream->remotePeerId().has_value());
71 [wp = weak_from_this(), stream](outcome::result<void> res) {
72 auto self = wp.lock();
78 auto peer_id = stream->remotePeerId().value();
80 if (not res.has_value()) {
81 SL_VERBOSE(self->base_.logger(),
82 "Handshake failed on incoming {} stream with {}: {}",
85 res.error().message());
90 res =
self->stream_engine_->addIncoming(stream,
self);
91 if (not res.has_value()) {
92 SL_VERBOSE(self->base_.logger(),
93 "Can't register incoming {} stream with {}: {}",
96 res.error().message());
101 SL_VERBOSE(self->base_.logger(),
102 "Fully established incoming {} stream with {}",
103 self->protocolName(),
110 std::function<
void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
114 [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
115 auto &&stream_res)
mutable {
116 auto self = wp.lock();
122 if (not stream_res.has_value()) {
123 SL_VERBOSE(self->base_.logger(),
124 "Can't create outgoing {} stream with {}: {}",
125 self->protocolName(),
127 stream_res.error().message());
128 cb(stream_res.as_failure());
131 const auto &stream_and_proto = stream_res.value();
134 stream = stream_and_proto.stream,
135 protocol = stream_and_proto.protocol,
136 cb = std::move(cb)](outcome::result<void> res) {
137 auto self = wp.lock();
143 if (not res.has_value()) {
144 SL_VERBOSE(self->base_.logger(),
145 "Handshake failed on outgoing {} stream with {}: {}",
147 stream->remotePeerId().value(),
148 res.error().message());
150 cb(res.as_failure());
154 res =
self->stream_engine_->addOutgoing(stream,
self);
155 if (not res.has_value()) {
156 SL_VERBOSE(self->base_.logger(),
157 "Can't register outgoing {} stream with {}: {}",
159 stream->remotePeerId().value(),
160 res.error().message());
162 cb(res.as_failure());
166 SL_VERBOSE(self->base_.logger(),
167 "Fully established outgoing {} stream with {}",
169 stream->remotePeerId().value());
170 cb(std::move(stream));
173 self->writeHandshake(std::move(stream_and_proto.stream),
180 std::shared_ptr<Stream> stream,
182 std::function<
void(outcome::result<void>)> &&cb) {
183 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
185 read_writer->read<
NoData>(
186 [stream, direction, wp = weak_from_this(), cb = std::move(cb)](
187 auto &&remote_handshake_res)
mutable {
188 auto self = wp.lock();
195 if (not remote_handshake_res.has_value()) {
196 SL_VERBOSE(self->base_.logger(),
197 "Can't read handshake from {}: {}",
198 stream->remotePeerId().value(),
199 remote_handshake_res.error().message());
201 cb(remote_handshake_res.as_failure());
205 SL_TRACE(self->base_.logger(),
206 "Handshake has received from {}",
207 stream->remotePeerId().value());
211 cb(outcome::success());
214 self->writeHandshake(
222 std::shared_ptr<Stream> stream,
224 std::function<
void(outcome::result<void>)> &&cb) {
225 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
227 read_writer->write(
NoData{},
228 [stream = std::move(stream),
230 wp = weak_from_this(),
231 cb = std::move(cb)](
auto &&write_res)
mutable {
232 auto self = wp.lock();
239 if (not write_res.has_value()) {
240 SL_VERBOSE(self->base_.logger(),
241 "Can't send handshake to {}: {}",
242 stream->remotePeerId().value(),
243 write_res.error().message());
245 cb(write_res.as_failure());
249 SL_TRACE(self->base_.logger(),
250 "Handshake has sent to {}",
251 stream->remotePeerId().value());
255 self->readHandshake(std::move(stream),
260 cb(outcome::success());
261 self->readPropagatedExtrinsics(std::move(stream));
268 std::shared_ptr<Stream> stream) {
269 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
272 wp = weak_from_this()](
273 auto &&message_res)
mutable {
274 auto self = wp.lock();
280 if (not message_res.has_value()) {
281 SL_VERBOSE(self->base_.logger(),
282 "Can't read propagated transactions from {}: {}",
283 stream->remotePeerId().value(),
284 message_res.error().message());
289 auto peer_id = stream->remotePeerId().value();
290 auto &message = message_res.value();
292 SL_VERBOSE(self->base_.logger(),
293 "Received {} propagated transactions from {}",
294 message.extrinsics.size(),
297 if (self->babe_->wasSynchronized()) {
298 for (
auto &ext : message.extrinsics) {
299 auto result =
self->extrinsic_observer_->onTxMessage(ext);
301 SL_DEBUG(self->base_.logger(),
" Received tx {}", result.value());
303 SL_DEBUG(self->base_.logger(),
305 result.error().message());
309 SL_TRACE(self->base_.logger(),
310 "Skipping extrinsics processing since the node was not in a " 311 "synchronized state yet.");
314 self->readPropagatedExtrinsics(std::move(stream));
319 gsl::span<const primitives::Transaction> txs) {
321 base_.
logger(),
"Propagate transactions : {} extrinsics", txs.size());
323 std::vector<libp2p::peer::PeerId> peers;
326 peers.push_back(peer_id);
328 if (peers.size() > 1) {
330 for (
const auto &tx : txs) {
335 key.value(), peers));
343 txs.begin(), txs.end(), exts.
extrinsics.begin(), [](
auto &tx) {
349 (*shared_msg) = std::move(exts);
void readPropagatedExtrinsics(std::shared_ptr< Stream > stream)
void readHandshake(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
static ExtrinsicLifecycleEvent Broadcast(SubscribedExtrinsicId id, gsl::span< const libp2p::peer::PeerId > peers)
metrics::RegistryPtr metrics_registry_
std::shared_ptr< consensus::babe::Babe > babe_
void writeHandshake(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
log::Logger const & logger() const
std::shared_ptr< subscription::ExtrinsicEventKeyRepository > ext_event_key_repo_
libp2p::peer::PeerInfo PeerInfo
std::shared_ptr< StreamEngine > stream_engine_
libp2p::peer::PeerId PeerId
KAGOME_DEFINE_CACHE(BlockAnnounceProtocol)
virtual int32_t luckyPeers() const =0
#define KAGOME_EXTRACT_SHARED_CACHE(prefix, type)
std::shared_ptr< primitives::events::ExtrinsicSubscriptionEngine > extrinsic_events_engine_
virtual void inc()=0
Increment the counter by 1.
virtual const std::string & protocolId() const =0
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
void onIncomingStream(std::shared_ptr< Stream > stream) override
Protocols const & protocolIds() const
const libp2p::peer::Protocol kPropagateTransactionsProtocol
bool start(std::weak_ptr< T > wptr)
void propagateTransactions(gsl::span< const primitives::Transaction > txs)
std::shared_ptr< ExtrinsicObserver > extrinsic_observer_
std::vector< primitives::Extrinsic > extrinsics
const application::AppConfiguration & app_config_
PropagateTransactionsProtocol()=delete
metrics::Counter * metric_propagated_tx_counter_