1use crate::database::core::DATABASE;
2use crate::{error::OkuDiscoveryError, fs::OkuFs};
3use iroh_base::ticket::Ticket;
4use iroh_blobs::HashAndFormat;
5use iroh_docs::rpc::client::docs::ShareMode;
6use iroh_docs::sync::CapabilityKind;
7use iroh_docs::NamespaceId;
8use log::{error, info};
9use miette::IntoDiagnostic;
10use std::{path::PathBuf, time::Duration};
11use tokio::task::JoinSet;
12
13pub const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);
15
16pub const INITIAL_PUBLISH_DELAY: Duration = Duration::from_millis(500);
18
19impl OkuFs {
20 pub async fn announce_mutable_replica(
26 &self,
27 namespace_id: &NamespaceId,
28 ) -> miette::Result<NamespaceId> {
29 let ticket = mainline::Bytes::from(
30 self.create_document_ticket(namespace_id, &ShareMode::Read)
31 .await?
32 .to_bytes(),
33 );
34 let newest_timestamp = self
35 .get_newest_timestamp_in_folder(namespace_id, &PathBuf::from("/"))
36 .await? as i64;
37 let replica_private_key = mainline::SigningKey::from_bytes(
38 &self
39 .create_document_ticket(namespace_id, &ShareMode::Write)
40 .await?
41 .capability
42 .secret_key()
43 .into_diagnostic()?
44 .to_bytes(),
45 );
46 let mutable_item =
47 mainline::MutableItem::new(replica_private_key, ticket, newest_timestamp, None);
48 match self.dht.put_mutable(mutable_item).await {
49 Ok(_) => info!(
50 "Announced mutable replica {} … ",
51 crate::fs::util::fmt(namespace_id)
52 ),
53 Err(e) => error!(
54 "{}",
55 OkuDiscoveryError::ProblemAnnouncingContent(
56 crate::fs::util::fmt(namespace_id),
57 e.to_string()
58 )
59 ),
60 }
61 Ok(*namespace_id)
62 }
63
64 pub async fn announce_immutable_replica(
70 &self,
71 namespace_id: &NamespaceId,
72 ) -> miette::Result<NamespaceId> {
73 let public_key_bytes = namespace_id
74 .into_public_key()
75 .map_err(|e| miette::miette!("{}", e))?
76 .as_bytes()
77 .to_vec();
78 let announcement = DATABASE
79 .get_announcement(&public_key_bytes)
80 .ok()
81 .flatten()
82 .ok_or(miette::miette!(
83 "Prior announcement not found in database for replica {} … ",
84 crate::fs::util::fmt(namespace_id)
85 ))?;
86
87 let ticket = mainline::Bytes::from(
88 self.create_document_ticket(namespace_id, &ShareMode::Read)
89 .await?
90 .to_bytes(),
91 );
92 let newest_timestamp = self
93 .get_newest_timestamp_in_folder(namespace_id, &PathBuf::from("/"))
94 .await? as i64;
95 let mutable_item = mainline::MutableItem::new_signed_unchecked(
96 announcement.key.try_into().map_err(|_e| {
97 miette::miette!("Replica announcement key does not fit into 32 bytes … ")
98 })?,
99 announcement.signature.try_into().map_err(|_e| {
100 miette::miette!("Replica announcement signature does not fit into 64 bytes … ")
101 })?,
102 ticket,
103 newest_timestamp,
104 None,
105 );
106 match self.dht.put_mutable(mutable_item).await {
107 Ok(_) => info!(
108 "Announced immutable replica {} … ",
109 crate::fs::util::fmt(namespace_id)
110 ),
111 Err(e) => error!(
112 "{}",
113 OkuDiscoveryError::ProblemAnnouncingContent(
114 crate::fs::util::fmt(namespace_id),
115 e.to_string()
116 )
117 ),
118 }
119 Ok(*namespace_id)
120 }
121
122 pub async fn announce_replica(
130 &self,
131 namespace_id: &NamespaceId,
132 capability_kind: &CapabilityKind,
133 ) -> miette::Result<NamespaceId> {
134 match capability_kind {
135 CapabilityKind::Read => self.announce_immutable_replica(namespace_id).await,
136 CapabilityKind::Write => self.announce_mutable_replica(namespace_id).await,
137 }
138 }
139
140 pub async fn announce_home_replica(&self) -> miette::Result<NamespaceId> {
142 let home_replica = self
143 .home_replica()
144 .await
145 .ok_or(miette::miette!("No home replica set … "))?;
146 let ticket = mainline::Bytes::from(
147 self.create_document_ticket(&home_replica, &ShareMode::Read)
148 .await?
149 .to_bytes(),
150 );
151 let newest_timestamp = self
152 .get_newest_timestamp_in_folder(&home_replica, &PathBuf::from("/"))
153 .await? as i64;
154 let author_private_key = mainline::SigningKey::from_bytes(
155 &self
156 .get_author()
157 .await
158 .map_err(|e| miette::miette!("{}", e))?
159 .to_bytes(),
160 );
161 let mutable_item =
162 mainline::MutableItem::new(author_private_key, ticket, newest_timestamp, None);
163 match self.dht.put_mutable(mutable_item).await {
164 Ok(_) => info!(
165 "Announced home replica {} … ",
166 crate::fs::util::fmt(home_replica)
167 ),
168 Err(e) => error!(
169 "{}",
170 OkuDiscoveryError::ProblemAnnouncingContent(
171 crate::fs::util::fmt(home_replica),
172 e.to_string()
173 )
174 ),
175 }
176 Ok(home_replica)
177 }
178
179 pub async fn announce_replicas(&self) -> miette::Result<()> {
181 let mut future_set = JoinSet::new();
182
183 let self_clone = self.clone();
185 future_set.spawn(async move { self_clone.announce_home_replica().await });
186
187 let replicas = self.list_replicas().await?;
189 for (replica, capability_kind) in replicas {
190 let self_clone = self.clone();
191 future_set.spawn(async move {
192 self_clone
193 .announce_replica(&replica, &capability_kind)
194 .await
195 });
196 }
197 info!("Pending announcements: {} … ", future_set.len());
198 while let Some(res) = future_set.join_next().await {
200 match res {
201 Ok(result) => match result {
202 Ok(_) => (),
203 Err(e) => error!("{}", e),
204 },
205 Err(e) => error!("{}", e),
206 }
207 }
208
209 Ok(())
210 }
211}
212
213pub fn to_infohash(haf: &HashAndFormat) -> mainline::Id {
219 let mut data = [0u8; 20];
220 data.copy_from_slice(&haf.hash.as_bytes()[..20]);
221 mainline::Id::from_bytes(data).unwrap()
222}