oku_fs/
discovery.rs

1use crate::database::core::DATABASE;
2use crate::{error::OkuDiscoveryError, fs::OkuFs};
3use iroh_base::ticket::Ticket;
4use iroh_blobs::HashAndFormat;
5use iroh_docs::rpc::client::docs::ShareMode;
6use iroh_docs::sync::CapabilityKind;
7use iroh_docs::NamespaceId;
8use log::{error, info};
9use miette::IntoDiagnostic;
10use std::{path::PathBuf, time::Duration};
11use tokio::task::JoinSet;
12
13/// The delay between republishing content to the Mainline DHT.
14pub const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);
15
16/// The initial delay before publishing content to the Mainline DHT.
17pub const INITIAL_PUBLISH_DELAY: Duration = Duration::from_millis(500);
18
19impl OkuFs {
20    /// Announces a writable replica to the Mainline DHT.
21    ///
22    /// # Arguments
23    ///
24    /// * `namespace_id` - The ID of the replica to announce.
25    pub async fn announce_mutable_replica(
26        &self,
27        namespace_id: &NamespaceId,
28    ) -> miette::Result<NamespaceId> {
29        let ticket = self
30            .create_document_ticket(namespace_id, &ShareMode::Read)
31            .await?
32            .to_bytes();
33        let newest_timestamp = self
34            .get_newest_timestamp_in_folder(namespace_id, &PathBuf::from("/"))
35            .await? as i64;
36        let replica_private_key = mainline::SigningKey::from_bytes(
37            &self
38                .create_document_ticket(namespace_id, &ShareMode::Write)
39                .await?
40                .capability
41                .secret_key()
42                .into_diagnostic()?
43                .to_bytes(),
44        );
45        let mutable_item =
46            mainline::MutableItem::new(replica_private_key, &ticket, newest_timestamp, None);
47        match self.dht.put_mutable(mutable_item, None).await {
48            Ok(_) => info!(
49                "Announced mutable replica {} … ",
50                crate::fs::util::fmt(namespace_id)
51            ),
52            Err(e) => error!(
53                "{}",
54                OkuDiscoveryError::ProblemAnnouncingContent(
55                    crate::fs::util::fmt(namespace_id),
56                    e.to_string()
57                )
58            ),
59        }
60        Ok(*namespace_id)
61    }
62
63    /// Announces a read-only replica to the Mainline DHT.
64    ///
65    /// # Arguments
66    ///
67    /// * `namespace_id` - The ID of the replica to announce.
68    pub async fn announce_immutable_replica(
69        &self,
70        namespace_id: &NamespaceId,
71    ) -> miette::Result<NamespaceId> {
72        let public_key_bytes = namespace_id
73            .into_public_key()
74            .map_err(|e| miette::miette!("{}", e))?
75            .as_bytes()
76            .to_vec();
77        let announcement = DATABASE
78            .get_announcement(&public_key_bytes)
79            .ok()
80            .flatten()
81            .ok_or(miette::miette!(
82                "Prior announcement not found in database for replica {} … ",
83                crate::fs::util::fmt(namespace_id)
84            ))?;
85
86        let ticket = self
87            .create_document_ticket(namespace_id, &ShareMode::Read)
88            .await?
89            .to_bytes();
90        let newest_timestamp = self
91            .get_newest_timestamp_in_folder(namespace_id, &PathBuf::from("/"))
92            .await? as i64;
93        let mutable_item = mainline::MutableItem::new_signed_unchecked(
94            announcement.key.try_into().map_err(|_e| {
95                miette::miette!("Replica announcement key does not fit into 32 bytes … ")
96            })?,
97            announcement.signature.try_into().map_err(|_e| {
98                miette::miette!("Replica announcement signature does not fit into 64 bytes … ")
99            })?,
100            &ticket,
101            newest_timestamp,
102            None,
103        );
104        match self.dht.put_mutable(mutable_item, None).await {
105            Ok(_) => info!(
106                "Announced immutable replica {} … ",
107                crate::fs::util::fmt(namespace_id)
108            ),
109            Err(e) => error!(
110                "{}",
111                OkuDiscoveryError::ProblemAnnouncingContent(
112                    crate::fs::util::fmt(namespace_id),
113                    e.to_string()
114                )
115            ),
116        }
117        Ok(*namespace_id)
118    }
119
120    /// Announces a replica to the Mainline DHT.
121    ///
122    /// # Arguments
123    ///
124    /// * `namespace_id` - The ID of the replica to announce.
125    ///
126    /// * `capability_kind` - Whether the replica is writable by the current node or read-only.
127    pub async fn announce_replica(
128        &self,
129        namespace_id: &NamespaceId,
130        capability_kind: &CapabilityKind,
131    ) -> miette::Result<NamespaceId> {
132        match capability_kind {
133            CapabilityKind::Read => self.announce_immutable_replica(namespace_id).await,
134            CapabilityKind::Write => self.announce_mutable_replica(namespace_id).await,
135        }
136    }
137
138    /// Announce the home replica
139    pub async fn announce_home_replica(&self) -> miette::Result<NamespaceId> {
140        let home_replica = self
141            .home_replica()
142            .await
143            .ok_or(miette::miette!("No home replica set … "))?;
144        let ticket = self
145            .create_document_ticket(&home_replica, &ShareMode::Read)
146            .await?
147            .to_bytes();
148        let newest_timestamp = self
149            .get_newest_timestamp_in_folder(&home_replica, &PathBuf::from("/"))
150            .await? as i64;
151        let author_private_key = mainline::SigningKey::from_bytes(
152            &self
153                .get_author()
154                .await
155                .map_err(|e| miette::miette!("{}", e))?
156                .to_bytes(),
157        );
158        let mutable_item =
159            mainline::MutableItem::new(author_private_key, &ticket, newest_timestamp, None);
160        match self.dht.put_mutable(mutable_item, None).await {
161            Ok(_) => info!(
162                "Announced home replica {} … ",
163                crate::fs::util::fmt(home_replica)
164            ),
165            Err(e) => error!(
166                "{}",
167                OkuDiscoveryError::ProblemAnnouncingContent(
168                    crate::fs::util::fmt(home_replica),
169                    e.to_string()
170                )
171            ),
172        }
173        Ok(home_replica)
174    }
175
176    /// Announces all writable replicas to the Mainline DHT.
177    pub async fn announce_replicas(&self) -> miette::Result<()> {
178        let mut future_set = JoinSet::new();
179
180        // Prepare to announce home replica
181        let self_clone = self.clone();
182        future_set.spawn(async move { self_clone.announce_home_replica().await });
183
184        // Prepare to announce all replicas
185        let replicas = self.list_replicas().await?;
186        for (replica, capability_kind) in replicas {
187            let self_clone = self.clone();
188            future_set.spawn(async move {
189                self_clone
190                    .announce_replica(&replica, &capability_kind)
191                    .await
192            });
193        }
194        info!("Pending announcements: {} … ", future_set.len());
195        // Execute announcements in parallel
196        while let Some(res) = future_set.join_next().await {
197            match res {
198                Ok(result) => match result {
199                    Ok(_) => (),
200                    Err(e) => error!("{}", e),
201                },
202                Err(e) => error!("{}", e),
203            }
204        }
205
206        Ok(())
207    }
208}
209
210/// From: <https://github.com/n0-computer/iroh-experiments/blob/4e052c6b34720e26683083270706926a84e49411/content-discovery/iroh-mainline-content-discovery/src/client.rs#L53>
211///
212/// The mapping from an iroh [HashAndFormat] to a bittorrent infohash, aka [mainline::Id].
213///
214/// Since an infohash is just 20 bytes, this can not be a bidirectional mapping.
215pub fn to_infohash(haf: &HashAndFormat) -> mainline::Id {
216    let mut data = [0u8; 20];
217    data.copy_from_slice(&haf.hash.as_bytes()[..20]);
218    mainline::Id::from_bytes(data).unwrap()
219}