1use crate::database::core::DATABASE;
2use crate::database::dht::ReplicaAnnouncement;
3use crate::{error::OkuDiscoveryError, fs::OkuFs};
4use iroh_blobs::HashAndFormat;
5use iroh_docs::api::protocol::ShareMode;
6use iroh_docs::NamespaceId;
7use iroh_tickets::Ticket;
8use log::{debug, error, info};
9use miette::IntoDiagnostic;
10use std::{path::PathBuf, time::Duration};
11use tokio::task::JoinSet;
12
13pub const DEFAULT_REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);
15
16pub const DEFAULT_INITIAL_PUBLISH_DELAY: Duration = Duration::from_millis(500);
18
19impl OkuFs {
20 pub async fn announce_replica(
26 &self,
27 namespace_id: &NamespaceId,
28 ) -> miette::Result<NamespaceId> {
29 let public_key_bytes = namespace_id
30 .into_public_key()
31 .map_err(|e| miette::miette!("{}", e))?
32 .as_bytes()
33 .to_vec();
34 let existing_announcement = DATABASE.get_announcement(&public_key_bytes).ok().flatten();
35
36 let ticket = self
37 .create_document_ticket(namespace_id, &ShareMode::Read)
38 .await?
39 .to_bytes();
40 let newest_timestamp = self
41 .get_newest_timestamp_in_folder(namespace_id, &PathBuf::from("/"))
42 .await? as i64;
43
44 let mutable_item = match existing_announcement {
46 None => {
47 debug!(
48 "Prior announcement not found in database for replica {} … ",
49 crate::fs::util::fmt(namespace_id)
50 );
51 let replica_private_key = mainline::SigningKey::from_bytes(
53 &self
54 .create_document_ticket(namespace_id, &ShareMode::Write)
55 .await?
56 .capability
57 .secret_key()
58 .into_diagnostic()?
59 .to_bytes(),
60 );
61 mainline::MutableItem::new(replica_private_key, &ticket, newest_timestamp, None)
62 }
63 Some(announcement) => mainline::MutableItem::new_signed_unchecked(
64 announcement.key.try_into().map_err(|_e| {
65 miette::miette!("Replica announcement key does not fit into 32 bytes … ")
66 })?,
67 announcement.signature.try_into().map_err(|_e| {
68 miette::miette!("Replica announcement signature does not fit into 64 bytes … ")
69 })?,
70 &ticket,
71 newest_timestamp,
72 None,
73 ),
74 };
75 let replica_announcement = ReplicaAnnouncement {
76 key: mutable_item.key().to_vec(),
77 signature: mutable_item.signature().to_vec(),
78 };
79 match self.dht.put_mutable(mutable_item, None).await {
80 Ok(_) => {
81 info!(
82 "Announced replica {} … ",
83 crate::fs::util::fmt(namespace_id)
84 );
85 if let Err(e) = DATABASE.upsert_announcement(&replica_announcement) {
86 error!("{e}");
87 }
88 }
89 Err(e) => error!(
90 "{}",
91 OkuDiscoveryError::ProblemAnnouncingContent(
92 crate::fs::util::fmt(namespace_id),
93 e.to_string()
94 )
95 ),
96 }
97 Ok(*namespace_id)
98 }
99
100 pub async fn announce_replicas(&self) -> miette::Result<()> {
102 let mut future_set = JoinSet::new();
103
104 let replicas = self.list_replicas().await?;
106 for (replica, _capability_kind, _is_home_replica) in replicas {
107 let self_clone = self.clone();
108 future_set.spawn(async move { self_clone.announce_replica(&replica).await });
109 }
110 info!("Pending announcements: {} … ", future_set.len());
111 while let Some(res) = future_set.join_next().await {
113 match res {
114 Ok(result) => match result {
115 Ok(_) => (),
116 Err(e) => error!("{}", e),
117 },
118 Err(e) => error!("{}", e),
119 }
120 }
121
122 Ok(())
123 }
124}
125
126pub fn to_infohash(haf: &HashAndFormat) -> mainline::Id {
132 let mut data = [0u8; 20];
133 data.copy_from_slice(&haf.hash.as_bytes()[..20]);
134 mainline::Id::from_bytes(data).unwrap()
135}