1use crate::database::core::DATABASE;
2use crate::{error::OkuDiscoveryError, fs::OkuFs};
3use iroh_base::ticket::Ticket;
4use iroh_blobs::HashAndFormat;
5use iroh_docs::rpc::client::docs::ShareMode;
6use iroh_docs::sync::CapabilityKind;
7use iroh_docs::NamespaceId;
8use log::{error, info};
9use miette::IntoDiagnostic;
10use std::{path::PathBuf, time::Duration};
11use tokio::task::JoinSet;
12
13pub const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);
15
16pub const INITIAL_PUBLISH_DELAY: Duration = Duration::from_millis(500);
18
19impl OkuFs {
20 pub async fn announce_mutable_replica(
26 &self,
27 namespace_id: &NamespaceId,
28 ) -> miette::Result<NamespaceId> {
29 let ticket = self
30 .create_document_ticket(namespace_id, &ShareMode::Read)
31 .await?
32 .to_bytes();
33 let newest_timestamp = self
34 .get_newest_timestamp_in_folder(namespace_id, &PathBuf::from("/"))
35 .await? as i64;
36 let replica_private_key = mainline::SigningKey::from_bytes(
37 &self
38 .create_document_ticket(namespace_id, &ShareMode::Write)
39 .await?
40 .capability
41 .secret_key()
42 .into_diagnostic()?
43 .to_bytes(),
44 );
45 let mutable_item =
46 mainline::MutableItem::new(replica_private_key, &ticket, newest_timestamp, None);
47 match self.dht.put_mutable(mutable_item, None).await {
48 Ok(_) => info!(
49 "Announced mutable replica {} … ",
50 crate::fs::util::fmt(namespace_id)
51 ),
52 Err(e) => error!(
53 "{}",
54 OkuDiscoveryError::ProblemAnnouncingContent(
55 crate::fs::util::fmt(namespace_id),
56 e.to_string()
57 )
58 ),
59 }
60 Ok(*namespace_id)
61 }
62
63 pub async fn announce_immutable_replica(
69 &self,
70 namespace_id: &NamespaceId,
71 ) -> miette::Result<NamespaceId> {
72 let public_key_bytes = namespace_id
73 .into_public_key()
74 .map_err(|e| miette::miette!("{}", e))?
75 .as_bytes()
76 .to_vec();
77 let announcement = DATABASE
78 .get_announcement(&public_key_bytes)
79 .ok()
80 .flatten()
81 .ok_or(miette::miette!(
82 "Prior announcement not found in database for replica {} … ",
83 crate::fs::util::fmt(namespace_id)
84 ))?;
85
86 let ticket = self
87 .create_document_ticket(namespace_id, &ShareMode::Read)
88 .await?
89 .to_bytes();
90 let newest_timestamp = self
91 .get_newest_timestamp_in_folder(namespace_id, &PathBuf::from("/"))
92 .await? as i64;
93 let mutable_item = mainline::MutableItem::new_signed_unchecked(
94 announcement.key.try_into().map_err(|_e| {
95 miette::miette!("Replica announcement key does not fit into 32 bytes … ")
96 })?,
97 announcement.signature.try_into().map_err(|_e| {
98 miette::miette!("Replica announcement signature does not fit into 64 bytes … ")
99 })?,
100 &ticket,
101 newest_timestamp,
102 None,
103 );
104 match self.dht.put_mutable(mutable_item, None).await {
105 Ok(_) => info!(
106 "Announced immutable replica {} … ",
107 crate::fs::util::fmt(namespace_id)
108 ),
109 Err(e) => error!(
110 "{}",
111 OkuDiscoveryError::ProblemAnnouncingContent(
112 crate::fs::util::fmt(namespace_id),
113 e.to_string()
114 )
115 ),
116 }
117 Ok(*namespace_id)
118 }
119
120 pub async fn announce_replica(
128 &self,
129 namespace_id: &NamespaceId,
130 capability_kind: &CapabilityKind,
131 ) -> miette::Result<NamespaceId> {
132 match capability_kind {
133 CapabilityKind::Read => self.announce_immutable_replica(namespace_id).await,
134 CapabilityKind::Write => self.announce_mutable_replica(namespace_id).await,
135 }
136 }
137
138 pub async fn announce_home_replica(&self) -> miette::Result<NamespaceId> {
140 let home_replica = self
141 .home_replica()
142 .await
143 .ok_or(miette::miette!("No home replica set … "))?;
144 let ticket = self
145 .create_document_ticket(&home_replica, &ShareMode::Read)
146 .await?
147 .to_bytes();
148 let newest_timestamp = self
149 .get_newest_timestamp_in_folder(&home_replica, &PathBuf::from("/"))
150 .await? as i64;
151 let author_private_key = mainline::SigningKey::from_bytes(
152 &self
153 .get_author()
154 .await
155 .map_err(|e| miette::miette!("{}", e))?
156 .to_bytes(),
157 );
158 let mutable_item =
159 mainline::MutableItem::new(author_private_key, &ticket, newest_timestamp, None);
160 match self.dht.put_mutable(mutable_item, None).await {
161 Ok(_) => info!(
162 "Announced home replica {} … ",
163 crate::fs::util::fmt(home_replica)
164 ),
165 Err(e) => error!(
166 "{}",
167 OkuDiscoveryError::ProblemAnnouncingContent(
168 crate::fs::util::fmt(home_replica),
169 e.to_string()
170 )
171 ),
172 }
173 Ok(home_replica)
174 }
175
176 pub async fn announce_replicas(&self) -> miette::Result<()> {
178 let mut future_set = JoinSet::new();
179
180 let self_clone = self.clone();
182 future_set.spawn(async move { self_clone.announce_home_replica().await });
183
184 let replicas = self.list_replicas().await?;
186 for (replica, capability_kind) in replicas {
187 let self_clone = self.clone();
188 future_set.spawn(async move {
189 self_clone
190 .announce_replica(&replica, &capability_kind)
191 .await
192 });
193 }
194 info!("Pending announcements: {} … ", future_set.len());
195 while let Some(res) = future_set.join_next().await {
197 match res {
198 Ok(result) => match result {
199 Ok(_) => (),
200 Err(e) => error!("{}", e),
201 },
202 Err(e) => error!("{}", e),
203 }
204 }
205
206 Ok(())
207 }
208}
209
210pub fn to_infohash(haf: &HashAndFormat) -> mainline::Id {
216 let mut data = [0u8; 20];
217 data.copy_from_slice(&haf.hash.as_bytes()[..20]);
218 mainline::Id::from_bytes(data).unwrap()
219}