oku_fs/
discovery.rs

1use crate::database::core::DATABASE;
2use crate::{error::OkuDiscoveryError, fs::OkuFs};
3use iroh_base::ticket::Ticket;
4use iroh_blobs::HashAndFormat;
5use iroh_docs::rpc::client::docs::ShareMode;
6use iroh_docs::sync::CapabilityKind;
7use iroh_docs::NamespaceId;
8use log::{error, info};
9use miette::IntoDiagnostic;
10use std::{path::PathBuf, time::Duration};
11use tokio::task::JoinSet;
12
13/// The delay between republishing content to the Mainline DHT.
14pub const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);
15
16/// The initial delay before publishing content to the Mainline DHT.
17pub const INITIAL_PUBLISH_DELAY: Duration = Duration::from_millis(500);
18
19impl OkuFs {
20    /// Announces a writable replica to the Mainline DHT.
21    ///
22    /// # Arguments
23    ///
24    /// * `namespace_id` - The ID of the replica to announce.
25    pub async fn announce_mutable_replica(
26        &self,
27        namespace_id: &NamespaceId,
28    ) -> miette::Result<NamespaceId> {
29        let ticket = mainline::Bytes::from(
30            self.create_document_ticket(namespace_id, &ShareMode::Read)
31                .await?
32                .to_bytes(),
33        );
34        let newest_timestamp = self
35            .get_newest_timestamp_in_folder(namespace_id, &PathBuf::from("/"))
36            .await? as i64;
37        let replica_private_key = mainline::SigningKey::from_bytes(
38            &self
39                .create_document_ticket(namespace_id, &ShareMode::Write)
40                .await?
41                .capability
42                .secret_key()
43                .into_diagnostic()?
44                .to_bytes(),
45        );
46        let mutable_item =
47            mainline::MutableItem::new(replica_private_key, ticket, newest_timestamp, None);
48        match self.dht.put_mutable(mutable_item).await {
49            Ok(_) => info!(
50                "Announced mutable replica {} … ",
51                crate::fs::util::fmt(namespace_id)
52            ),
53            Err(e) => error!(
54                "{}",
55                OkuDiscoveryError::ProblemAnnouncingContent(
56                    crate::fs::util::fmt(namespace_id),
57                    e.to_string()
58                )
59            ),
60        }
61        Ok(*namespace_id)
62    }
63
64    /// Announces a read-only replica to the Mainline DHT.
65    ///
66    /// # Arguments
67    ///
68    /// * `namespace_id` - The ID of the replica to announce.
69    pub async fn announce_immutable_replica(
70        &self,
71        namespace_id: &NamespaceId,
72    ) -> miette::Result<NamespaceId> {
73        let public_key_bytes = namespace_id
74            .into_public_key()
75            .map_err(|e| miette::miette!("{}", e))?
76            .as_bytes()
77            .to_vec();
78        let announcement = DATABASE
79            .get_announcement(&public_key_bytes)
80            .ok()
81            .flatten()
82            .ok_or(miette::miette!(
83                "Prior announcement not found in database for replica {} … ",
84                crate::fs::util::fmt(namespace_id)
85            ))?;
86
87        let ticket = mainline::Bytes::from(
88            self.create_document_ticket(namespace_id, &ShareMode::Read)
89                .await?
90                .to_bytes(),
91        );
92        let newest_timestamp = self
93            .get_newest_timestamp_in_folder(namespace_id, &PathBuf::from("/"))
94            .await? as i64;
95        let mutable_item = mainline::MutableItem::new_signed_unchecked(
96            announcement.key.try_into().map_err(|_e| {
97                miette::miette!("Replica announcement key does not fit into 32 bytes … ")
98            })?,
99            announcement.signature.try_into().map_err(|_e| {
100                miette::miette!("Replica announcement signature does not fit into 64 bytes … ")
101            })?,
102            ticket,
103            newest_timestamp,
104            None,
105        );
106        match self.dht.put_mutable(mutable_item).await {
107            Ok(_) => info!(
108                "Announced immutable replica {} … ",
109                crate::fs::util::fmt(namespace_id)
110            ),
111            Err(e) => error!(
112                "{}",
113                OkuDiscoveryError::ProblemAnnouncingContent(
114                    crate::fs::util::fmt(namespace_id),
115                    e.to_string()
116                )
117            ),
118        }
119        Ok(*namespace_id)
120    }
121
122    /// Announces a replica to the Mainline DHT.
123    ///
124    /// # Arguments
125    ///
126    /// * `namespace_id` - The ID of the replica to announce.
127    ///
128    /// * `capability_kind` - Whether the replica is writable by the current node or read-only.
129    pub async fn announce_replica(
130        &self,
131        namespace_id: &NamespaceId,
132        capability_kind: &CapabilityKind,
133    ) -> miette::Result<NamespaceId> {
134        match capability_kind {
135            CapabilityKind::Read => self.announce_immutable_replica(namespace_id).await,
136            CapabilityKind::Write => self.announce_mutable_replica(namespace_id).await,
137        }
138    }
139
140    /// Announce the home replica
141    pub async fn announce_home_replica(&self) -> miette::Result<NamespaceId> {
142        let home_replica = self
143            .home_replica()
144            .await
145            .ok_or(miette::miette!("No home replica set … "))?;
146        let ticket = mainline::Bytes::from(
147            self.create_document_ticket(&home_replica, &ShareMode::Read)
148                .await?
149                .to_bytes(),
150        );
151        let newest_timestamp = self
152            .get_newest_timestamp_in_folder(&home_replica, &PathBuf::from("/"))
153            .await? as i64;
154        let author_private_key = mainline::SigningKey::from_bytes(
155            &self
156                .get_author()
157                .await
158                .map_err(|e| miette::miette!("{}", e))?
159                .to_bytes(),
160        );
161        let mutable_item =
162            mainline::MutableItem::new(author_private_key, ticket, newest_timestamp, None);
163        match self.dht.put_mutable(mutable_item).await {
164            Ok(_) => info!(
165                "Announced home replica {} … ",
166                crate::fs::util::fmt(home_replica)
167            ),
168            Err(e) => error!(
169                "{}",
170                OkuDiscoveryError::ProblemAnnouncingContent(
171                    crate::fs::util::fmt(home_replica),
172                    e.to_string()
173                )
174            ),
175        }
176        Ok(home_replica)
177    }
178
179    /// Announces all writable replicas to the Mainline DHT.
180    pub async fn announce_replicas(&self) -> miette::Result<()> {
181        let mut future_set = JoinSet::new();
182
183        // Prepare to announce home replica
184        let self_clone = self.clone();
185        future_set.spawn(async move { self_clone.announce_home_replica().await });
186
187        // Prepare to announce all replicas
188        let replicas = self.list_replicas().await?;
189        for (replica, capability_kind) in replicas {
190            let self_clone = self.clone();
191            future_set.spawn(async move {
192                self_clone
193                    .announce_replica(&replica, &capability_kind)
194                    .await
195            });
196        }
197        info!("Pending announcements: {} … ", future_set.len());
198        // Execute announcements in parallel
199        while let Some(res) = future_set.join_next().await {
200            match res {
201                Ok(result) => match result {
202                    Ok(_) => (),
203                    Err(e) => error!("{}", e),
204                },
205                Err(e) => error!("{}", e),
206            }
207        }
208
209        Ok(())
210    }
211}
212
213/// From: <https://github.com/n0-computer/iroh-experiments/blob/4e052c6b34720e26683083270706926a84e49411/content-discovery/iroh-mainline-content-discovery/src/client.rs#L53>
214///
215/// The mapping from an iroh [HashAndFormat] to a bittorrent infohash, aka [mainline::Id].
216///
217/// Since an infohash is just 20 bytes, this can not be a bidirectional mapping.
218pub fn to_infohash(haf: &HashAndFormat) -> mainline::Id {
219    let mut data = [0u8; 20];
220    data.copy_from_slice(&haf.hash.as_bytes()[..20]);
221    mainline::Id::from_bytes(data).unwrap()
222}