Skip to main content

oku_fs/
discovery.rs

1use crate::database::core::DATABASE;
2use crate::database::dht::ReplicaAnnouncement;
3use crate::{error::OkuDiscoveryError, fs::OkuFs};
4use iroh_blobs::HashAndFormat;
5use iroh_docs::api::protocol::ShareMode;
6use iroh_docs::NamespaceId;
7use iroh_tickets::Ticket;
8use log::{debug, error, info};
9use miette::IntoDiagnostic;
10use std::{path::PathBuf, time::Duration};
11use tokio::task::JoinSet;
12
13/// The delay between republishing content to the Mainline DHT.
14pub const DEFAULT_REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);
15
16/// The initial delay before publishing content to the Mainline DHT.
17pub const DEFAULT_INITIAL_PUBLISH_DELAY: Duration = Duration::from_millis(500);
18
19impl OkuFs {
20    /// Announces a replica to the Mainline DHT.
21    ///
22    /// # Arguments
23    ///
24    /// * `namespace_id` - The ID of the replica to announce.
25    pub async fn announce_replica(
26        &self,
27        namespace_id: &NamespaceId,
28    ) -> miette::Result<NamespaceId> {
29        let public_key_bytes = namespace_id
30            .into_public_key()
31            .map_err(|e| miette::miette!("{}", e))?
32            .as_bytes()
33            .to_vec();
34        let existing_announcement = DATABASE.get_announcement(&public_key_bytes).ok().flatten();
35
36        let ticket = self
37            .create_document_ticket(namespace_id, &ShareMode::Read)
38            .await?
39            .to_bytes();
40        let newest_timestamp = self
41            .get_newest_timestamp_in_folder(namespace_id, &PathBuf::from("/"))
42            .await? as i64;
43
44        // Ideally, we can repeat an announcement we've already heard for this replica
45        let mutable_item = match existing_announcement {
46            None => {
47                debug!(
48                    "Prior announcement not found in database for replica {} … ",
49                    crate::fs::util::fmt(namespace_id)
50                );
51                // Even if we don't have someone else's announcement saved, we can create our own if we have write access to the replica
52                let replica_private_key = mainline::SigningKey::from_bytes(
53                    &self
54                        .create_document_ticket(namespace_id, &ShareMode::Write)
55                        .await?
56                        .capability
57                        .secret_key()
58                        .into_diagnostic()?
59                        .to_bytes(),
60                );
61                mainline::MutableItem::new(replica_private_key, &ticket, newest_timestamp, None)
62            }
63            Some(announcement) => mainline::MutableItem::new_signed_unchecked(
64                announcement.key.try_into().map_err(|_e| {
65                    miette::miette!("Replica announcement key does not fit into 32 bytes … ")
66                })?,
67                announcement.signature.try_into().map_err(|_e| {
68                    miette::miette!("Replica announcement signature does not fit into 64 bytes … ")
69                })?,
70                &ticket,
71                newest_timestamp,
72                None,
73            ),
74        };
75        let replica_announcement = ReplicaAnnouncement {
76            key: mutable_item.key().to_vec(),
77            signature: mutable_item.signature().to_vec(),
78        };
79        match self.dht.put_mutable(mutable_item, None).await {
80            Ok(_) => {
81                info!(
82                    "Announced replica {} … ",
83                    crate::fs::util::fmt(namespace_id)
84                );
85                if let Err(e) = DATABASE.upsert_announcement(&replica_announcement) {
86                    error!("{e}");
87                }
88            }
89            Err(e) => error!(
90                "{}",
91                OkuDiscoveryError::ProblemAnnouncingContent(
92                    crate::fs::util::fmt(namespace_id),
93                    e.to_string()
94                )
95            ),
96        }
97        Ok(*namespace_id)
98    }
99
100    /// Announces read-only tickets for all known replicas to the Mainline DHT.
101    pub async fn announce_replicas(&self) -> miette::Result<()> {
102        let mut future_set = JoinSet::new();
103
104        // Prepare to announce all replicas
105        let replicas = self.list_replicas().await?;
106        for (replica, _capability_kind, _is_home_replica) in replicas {
107            let self_clone = self.clone();
108            future_set.spawn(async move { self_clone.announce_replica(&replica).await });
109        }
110        info!("Pending announcements: {} … ", future_set.len());
111        // Execute announcements in parallel
112        while let Some(res) = future_set.join_next().await {
113            match res {
114                Ok(result) => match result {
115                    Ok(_) => (),
116                    Err(e) => error!("{}", e),
117                },
118                Err(e) => error!("{}", e),
119            }
120        }
121
122        Ok(())
123    }
124}
125
126/// From: <https://github.com/n0-computer/iroh-experiments/blob/4e052c6b34720e26683083270706926a84e49411/content-discovery/iroh-mainline-content-discovery/src/client.rs#L53>
127///
128/// The mapping from an iroh [HashAndFormat] to a bittorrent infohash, aka [mainline::Id].
129///
130/// Since an infohash is just 20 bytes, this can not be a bidirectional mapping.
131pub fn to_infohash(haf: &HashAndFormat) -> mainline::Id {
132    let mut data = [0u8; 20];
133    data.copy_from_slice(&haf.hash.as_bytes()[..20]);
134    mainline::Id::from_bytes(data).unwrap()
135}