1use super::*;
2use crate::database::core::DATABASE;
3use crate::database::dht::ReplicaAnnouncement;
4use crate::error::{OkuDiscoveryError, OkuFsError, OkuFuseError};
5use anyhow::anyhow;
6use futures::{pin_mut, StreamExt};
7use iroh_docs::api::protocol::AddrInfoOptions;
8use iroh_docs::api::protocol::ShareMode;
9use iroh_docs::engine::LiveEvent;
10use iroh_docs::store::FilterKind;
11use iroh_docs::sync::CapabilityKind;
12use iroh_docs::NamespaceId;
13use iroh_docs::{AuthorId, DocTicket};
14use iroh_tickets::Ticket;
15use log::{error, info};
16use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
17use std::collections::HashSet;
18use std::path::PathBuf;
19use util::{merge_tickets, path_to_entry_prefix};
20
21impl OkuFs {
22 pub async fn create_replica(&self) -> miette::Result<NamespaceId> {
28 let docs_client = &self.docs;
29 let new_document = docs_client.create().await.map_err(|e| {
30 error!("{}", e);
31 OkuFsError::CannotCreateReplica
32 })?;
33 let document_id = new_document.id();
34 new_document.close().await.map_err(|e| {
35 error!("{}", e);
36 OkuFsError::CannotExitReplica
37 })?;
38 self.replica_sender.send_replace(());
39 Ok(document_id)
40 }
41
42 pub async fn is_home_replica(&self, namespace_id: &NamespaceId) -> bool {
52 let replica_capability = self.get_replica_capability(namespace_id).await;
53 match replica_capability {
54 Err(e) => {
55 error!("{e}");
56 false
57 }
58 Ok(CapabilityKind::Read) => false,
59 Ok(CapabilityKind::Write) => {
60 let authors_list = self.docs.author_list().await;
61 match authors_list {
62 Err(e) => {
63 error!("{e}");
64 false
65 }
66 Ok(authors_list) => {
67 let authors: HashSet<AuthorId> = authors_list
68 .filter_map(|x| async move { x.ok() })
69 .collect()
70 .await;
71 authors.contains(&AuthorId::from(namespace_id.as_bytes()))
72 }
73 }
74 }
75 }
76 }
77
78 pub async fn delete_replica(&self, namespace_id: &NamespaceId) -> miette::Result<()> {
84 let docs_client: &Docs = &self.docs;
85 if self.is_home_replica(namespace_id).await {
86 return Err(miette::miette!(
87 "Cannot delete a home replica (replica ID: {})",
88 crate::fs::util::fmt(namespace_id)
89 ));
90 }
91 self.replica_sender.send_replace(());
92 Ok(docs_client.drop_doc(*namespace_id).await.map_err(|e| {
93 error!("{}", e);
94 OkuFsError::CannotDeleteReplica
95 })?)
96 }
97
98 pub async fn list_replicas(&self) -> miette::Result<Vec<(NamespaceId, CapabilityKind, bool)>> {
104 let docs_client = &self.docs;
105
106 let authors: HashSet<AuthorId> = docs_client
107 .author_list()
108 .await
109 .map_err(|e| miette::miette!(e))?
110 .filter_map(|x| async move { x.ok() })
111 .collect()
112 .await;
113
114 let replicas = docs_client.list().await.map_err(|e| {
115 error!("{}", e);
116 OkuFsError::CannotListReplicas
117 })?;
118 pin_mut!(replicas);
119 let mut replica_ids: Vec<(NamespaceId, CapabilityKind, bool)> = replicas
120 .filter_map(|replica| async {
121 replica.ok().map(|(x, y)| {
122 (
123 x,
124 y,
125 (&authors).contains(&AuthorId::from(x.as_bytes()))
126 && matches!(y, CapabilityKind::Write),
127 )
128 })
129 })
130 .collect()
131 .await;
132
133 replica_ids.sort_unstable_by_key(|(_namespace_id, capability_kind, is_home_replica)| {
134 (
135 !is_home_replica,
136 !matches!(capability_kind, CapabilityKind::Write),
137 )
138 });
139 Ok(replica_ids)
140 }
141
142 pub async fn get_replica_capability(
152 &self,
153 namespace_id: &NamespaceId,
154 ) -> miette::Result<CapabilityKind> {
155 let replicas_vec = self.list_replicas().await?;
156 match replicas_vec
157 .par_iter()
158 .find_any(|replica| replica.0 == *namespace_id)
159 {
160 Some(replica) => Ok(replica.1),
161 None => Err(OkuFuseError::NoReplica(crate::fs::util::fmt(namespace_id)).into()),
162 }
163 }
164
165 pub async fn fetch_replica_by_id(
173 &self,
174 namespace_id: &NamespaceId,
175 path: &Option<PathBuf>,
176 ) -> anyhow::Result<()> {
177 let ticket = self.resolve_namespace_id(namespace_id).await?;
178 let docs_client = &self.docs;
179 let replica_sender = self.replica_sender.clone();
180 match path.clone() {
181 Some(path) => {
182 let replica = docs_client.import_namespace(ticket.capability).await?;
183 let filter = FilterKind::Prefix(path_to_entry_prefix(&path));
184 replica
185 .set_download_policy(iroh_docs::store::DownloadPolicy::NothingExcept(vec![
186 filter,
187 ]))
188 .await?;
189 replica.start_sync(ticket.nodes).await?;
190 let mut events = replica.subscribe().await?;
191 let sync_start = std::time::Instant::now();
192 while let Some(event) = events.next().await {
193 if matches!(event?, LiveEvent::SyncFinished(_)) {
194 let elapsed = sync_start.elapsed();
195 info!(
196 "Synchronisation took {elapsed:?} for {} … ",
197 crate::fs::util::fmt(namespace_id),
198 );
199 break;
200 }
201 }
202 }
203 None => {
204 if let Some(replica) = docs_client.open(*namespace_id).await.unwrap_or(None) {
205 replica
206 .set_download_policy(iroh_docs::store::DownloadPolicy::default())
207 .await?;
208 replica.start_sync(ticket.nodes).await?;
209 let mut events = replica.subscribe().await?;
210 let sync_start = std::time::Instant::now();
211 while let Some(event) = events.next().await {
212 if matches!(event?, LiveEvent::SyncFinished(_)) {
213 let elapsed = sync_start.elapsed();
214 info!(
215 "Synchronisation took {elapsed:?} for {} … ",
216 crate::fs::util::fmt(namespace_id),
217 );
218 break;
219 }
220 }
221 } else {
222 let (_replica, mut events) = docs_client.import_and_subscribe(ticket).await?;
223 let sync_start = std::time::Instant::now();
224 while let Some(event) = events.next().await {
225 if matches!(event?, LiveEvent::SyncFinished(_)) {
226 let elapsed = sync_start.elapsed();
227 info!(
228 "Synchronisation took {elapsed:?} for {} … ",
229 crate::fs::util::fmt(namespace_id),
230 );
231 break;
232 }
233 }
234 }
235 }
236 }
237 replica_sender.send_replace(());
238 Ok(())
239 }
240
241 pub async fn fetch_replica_by_ticket(
253 &self,
254 ticket: &DocTicket,
255 path: &Option<PathBuf>,
256 filters: &Option<Vec<FilterKind>>,
257 ) -> anyhow::Result<()> {
258 let namespace_id = ticket.capability.id();
259 let docs_client = &self.docs;
260 let replica_sender = self.replica_sender.clone();
261 match path.clone() {
262 Some(path) => {
263 let replica = docs_client
264 .import_namespace(ticket.capability.clone())
265 .await?;
266 let filters = filters
267 .clone()
268 .unwrap_or(vec![FilterKind::Prefix(path_to_entry_prefix(&path))]);
269 replica
270 .set_download_policy(iroh_docs::store::DownloadPolicy::NothingExcept(filters))
271 .await?;
272 replica.start_sync(ticket.nodes.clone()).await?;
273 let mut events = replica.subscribe().await?;
274 let sync_start = std::time::Instant::now();
275 while let Some(event) = events.next().await {
276 if matches!(event?, LiveEvent::SyncFinished(_)) {
277 let elapsed = sync_start.elapsed();
278 info!(
279 "Synchronisation took {elapsed:?} for {} … ",
280 crate::fs::util::fmt(namespace_id),
281 );
282 break;
283 }
284 }
285 }
286 None => {
287 if let Some(replica) = docs_client.open(namespace_id).await.unwrap_or(None) {
288 replica
289 .set_download_policy(iroh_docs::store::DownloadPolicy::default())
290 .await?;
291 replica.start_sync(ticket.nodes.clone()).await?;
292 let mut events = replica.subscribe().await?;
293 let sync_start = std::time::Instant::now();
294 while let Some(event) = events.next().await {
295 if matches!(event?, LiveEvent::SyncFinished(_)) {
296 let elapsed = sync_start.elapsed();
297 info!(
298 "Synchronisation took {elapsed:?} for {} … ",
299 crate::fs::util::fmt(namespace_id),
300 );
301 break;
302 }
303 }
304 } else {
305 let (_replica, mut events) =
306 docs_client.import_and_subscribe(ticket.clone()).await?;
307 let sync_start = std::time::Instant::now();
308 while let Some(event) = events.next().await {
309 if matches!(event?, LiveEvent::SyncFinished(_)) {
310 let elapsed = sync_start.elapsed();
311 info!(
312 "Synchronisation took {elapsed:?} for {} … ",
313 crate::fs::util::fmt(namespace_id),
314 );
315 break;
316 }
317 }
318 }
319 }
320 };
321 replica_sender.send_replace(());
322 Ok(())
323 }
324
325 pub async fn sync_replica(&self, namespace_id: &NamespaceId) -> anyhow::Result<()> {
333 let ticket = self.resolve_namespace_id(namespace_id).await?;
334 let docs_client = &self.docs;
335 let replica_sender = self.replica_sender.clone();
336 let (_replica, mut events) = docs_client.import_and_subscribe(ticket).await?;
337 let sync_start = std::time::Instant::now();
338 while let Some(event) = events.next().await {
339 if matches!(event?, LiveEvent::SyncFinished(_)) {
340 let elapsed = sync_start.elapsed();
341 info!(
342 "Synchronisation took {elapsed:?} for {} … ",
343 crate::fs::util::fmt(namespace_id),
344 );
345 break;
346 }
347 }
348 replica_sender.send_replace(());
349 Ok(())
350 }
351
352 pub async fn resolve_namespace_id(
362 &self,
363 namespace_id: &NamespaceId,
364 ) -> anyhow::Result<DocTicket> {
365 let get_stream = self.dht.get_mutable(namespace_id.as_bytes(), None, None);
366 tokio::pin!(get_stream);
367 let mut tickets = Vec::new();
368 while let Some(mutable_item) = get_stream.next().await {
369 let ticket = DocTicket::from_bytes(mutable_item.value())?;
370 let ticket_namespace_id = &ticket.capability.id();
371 if ticket_namespace_id != namespace_id {
372 error!("Ticket is for replica with ID {}, but claims to be for replica with ID {}; ignoring ticket … ", crate::fs::util::fmt(ticket_namespace_id), crate::fs::util::fmt(namespace_id));
373 continue;
374 }
375 if let Err(e) = DATABASE.upsert_announcement(&ReplicaAnnouncement {
376 key: mutable_item.key().to_vec(),
377 signature: mutable_item.signature().to_vec(),
378 }) {
379 error!("{e}");
380 }
381 tickets.push(ticket)
382 }
383 merge_tickets(&tickets).ok_or(anyhow!(
384 "Could not find tickets for {} … ",
385 crate::fs::util::fmt(namespace_id)
386 ))
387 }
388
389 pub async fn create_document_ticket(
401 &self,
402 namespace_id: &NamespaceId,
403 share_mode: &ShareMode,
404 ) -> miette::Result<DocTicket> {
405 if matches!(share_mode, ShareMode::Write)
406 && matches!(
407 self.get_replica_capability(namespace_id).await?,
408 CapabilityKind::Read
409 )
410 {
411 Err(OkuFsError::CannotShareReplicaWritable(*namespace_id).into())
412 } else {
413 let docs_client = &self.docs;
414 let document = docs_client
415 .open(*namespace_id)
416 .await
417 .map_err(|e| {
418 error!("{}", e);
419 OkuFsError::CannotOpenReplica
420 })?
421 .ok_or(OkuFsError::FsEntryNotFound)?;
422 Ok(document
423 .share(share_mode.clone(), AddrInfoOptions::RelayAndAddresses)
424 .await
425 .map_err(|e| {
426 error!("{}", e);
427 OkuDiscoveryError::CannotGenerateSharingTicket
428 })?)
429 }
430 }
431}