Skip to main content

oku_fs/fs/
replica.rs

1use super::*;
2use crate::database::core::DATABASE;
3use crate::database::dht::ReplicaAnnouncement;
4use crate::error::{OkuDiscoveryError, OkuFsError, OkuFuseError};
5use anyhow::anyhow;
6use futures::{pin_mut, StreamExt};
7use iroh_docs::api::protocol::AddrInfoOptions;
8use iroh_docs::api::protocol::ShareMode;
9use iroh_docs::engine::LiveEvent;
10use iroh_docs::store::FilterKind;
11use iroh_docs::sync::CapabilityKind;
12use iroh_docs::NamespaceId;
13use iroh_docs::{AuthorId, DocTicket};
14use iroh_tickets::Ticket;
15use log::{error, info};
16use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
17use std::collections::HashSet;
18use std::path::PathBuf;
19use util::{merge_tickets, path_to_entry_prefix};
20
21impl OkuFs {
22    /// Creates a new replica in the file system.
23    ///
24    /// # Returns
25    ///
26    /// The ID of the new replica, being its public key.
27    pub async fn create_replica(&self) -> miette::Result<NamespaceId> {
28        let docs_client = &self.docs;
29        let new_document = docs_client.create().await.map_err(|e| {
30            error!("{}", e);
31            OkuFsError::CannotCreateReplica
32        })?;
33        let document_id = new_document.id();
34        new_document.close().await.map_err(|e| {
35            error!("{}", e);
36            OkuFsError::CannotExitReplica
37        })?;
38        self.replica_sender.send_replace(());
39        Ok(document_id)
40    }
41
42    /// Returns whether a replica is a home replica.
43    ///
44    /// # Arguments
45    ///
46    /// * `namespace_id` - The ID of the replica.
47    ///
48    /// # Returns
49    ///
50    /// If the given replica is a home replica.
51    pub async fn is_home_replica(&self, namespace_id: &NamespaceId) -> bool {
52        let replica_capability = self.get_replica_capability(namespace_id).await;
53        match replica_capability {
54            Err(e) => {
55                error!("{e}");
56                false
57            }
58            Ok(CapabilityKind::Read) => false,
59            Ok(CapabilityKind::Write) => {
60                let authors_list = self.docs.author_list().await;
61                match authors_list {
62                    Err(e) => {
63                        error!("{e}");
64                        false
65                    }
66                    Ok(authors_list) => {
67                        let authors: HashSet<AuthorId> = authors_list
68                            .filter_map(|x| async move { x.ok() })
69                            .collect()
70                            .await;
71                        authors.contains(&AuthorId::from(namespace_id.as_bytes()))
72                    }
73                }
74            }
75        }
76    }
77
78    /// Deletes a replica from the file system.
79    ///
80    /// # Arguments
81    ///
82    /// * `namespace_id` - The ID of the replica to delete.
83    pub async fn delete_replica(&self, namespace_id: &NamespaceId) -> miette::Result<()> {
84        let docs_client: &Docs = &self.docs;
85        if self.is_home_replica(namespace_id).await {
86            return Err(miette::miette!(
87                "Cannot delete a home replica (replica ID: {})",
88                crate::fs::util::fmt(namespace_id)
89            ));
90        }
91        self.replica_sender.send_replace(());
92        Ok(docs_client.drop_doc(*namespace_id).await.map_err(|e| {
93            error!("{}", e);
94            OkuFsError::CannotDeleteReplica
95        })?)
96    }
97
98    /// Lists all replicas in the file system.
99    ///
100    /// # Returns
101    ///
102    /// A list of all replicas in the file system, including their ID, capability kind (read or write), and if it's a home replica.
103    pub async fn list_replicas(&self) -> miette::Result<Vec<(NamespaceId, CapabilityKind, bool)>> {
104        let docs_client = &self.docs;
105
106        let authors: HashSet<AuthorId> = docs_client
107            .author_list()
108            .await
109            .map_err(|e| miette::miette!(e))?
110            .filter_map(|x| async move { x.ok() })
111            .collect()
112            .await;
113
114        let replicas = docs_client.list().await.map_err(|e| {
115            error!("{}", e);
116            OkuFsError::CannotListReplicas
117        })?;
118        pin_mut!(replicas);
119        let mut replica_ids: Vec<(NamespaceId, CapabilityKind, bool)> = replicas
120            .filter_map(|replica| async {
121                replica.ok().map(|(x, y)| {
122                    (
123                        x,
124                        y,
125                        (&authors).contains(&AuthorId::from(x.as_bytes()))
126                            && matches!(y, CapabilityKind::Write),
127                    )
128                })
129            })
130            .collect()
131            .await;
132
133        replica_ids.sort_unstable_by_key(|(_namespace_id, capability_kind, is_home_replica)| {
134            (
135                !is_home_replica,
136                !matches!(capability_kind, CapabilityKind::Write),
137            )
138        });
139        Ok(replica_ids)
140    }
141
142    /// Retrieves the permissions for a local replica.
143    ///
144    /// # Arguments
145    ///
146    /// * `namespace_id` - The ID of the replica.
147    ///
148    /// # Returns
149    ///
150    /// If either the replica can be read from & written to, or if it can only be read from.
151    pub async fn get_replica_capability(
152        &self,
153        namespace_id: &NamespaceId,
154    ) -> miette::Result<CapabilityKind> {
155        let replicas_vec = self.list_replicas().await?;
156        match replicas_vec
157            .par_iter()
158            .find_any(|replica| replica.0 == *namespace_id)
159        {
160            Some(replica) => Ok(replica.1),
161            None => Err(OkuFuseError::NoReplica(crate::fs::util::fmt(namespace_id)).into()),
162        }
163    }
164
165    /// Join a swarm to fetch the latest version of a replica and save it to the local machine.
166    ///
167    /// # Arguments
168    ///
169    /// * `namespace_id` - The ID of the replica to fetch.
170    ///
171    /// * `path` - An optional path of requested files within the replica.
172    pub async fn fetch_replica_by_id(
173        &self,
174        namespace_id: &NamespaceId,
175        path: &Option<PathBuf>,
176    ) -> anyhow::Result<()> {
177        let ticket = self.resolve_namespace_id(namespace_id).await?;
178        let docs_client = &self.docs;
179        let replica_sender = self.replica_sender.clone();
180        match path.clone() {
181            Some(path) => {
182                let replica = docs_client.import_namespace(ticket.capability).await?;
183                let filter = FilterKind::Prefix(path_to_entry_prefix(&path));
184                replica
185                    .set_download_policy(iroh_docs::store::DownloadPolicy::NothingExcept(vec![
186                        filter,
187                    ]))
188                    .await?;
189                replica.start_sync(ticket.nodes).await?;
190                let mut events = replica.subscribe().await?;
191                let sync_start = std::time::Instant::now();
192                while let Some(event) = events.next().await {
193                    if matches!(event?, LiveEvent::SyncFinished(_)) {
194                        let elapsed = sync_start.elapsed();
195                        info!(
196                            "Synchronisation took {elapsed:?} for {} … ",
197                            crate::fs::util::fmt(namespace_id),
198                        );
199                        break;
200                    }
201                }
202            }
203            None => {
204                if let Some(replica) = docs_client.open(*namespace_id).await.unwrap_or(None) {
205                    replica
206                        .set_download_policy(iroh_docs::store::DownloadPolicy::default())
207                        .await?;
208                    replica.start_sync(ticket.nodes).await?;
209                    let mut events = replica.subscribe().await?;
210                    let sync_start = std::time::Instant::now();
211                    while let Some(event) = events.next().await {
212                        if matches!(event?, LiveEvent::SyncFinished(_)) {
213                            let elapsed = sync_start.elapsed();
214                            info!(
215                                "Synchronisation took {elapsed:?} for {} … ",
216                                crate::fs::util::fmt(namespace_id),
217                            );
218                            break;
219                        }
220                    }
221                } else {
222                    let (_replica, mut events) = docs_client.import_and_subscribe(ticket).await?;
223                    let sync_start = std::time::Instant::now();
224                    while let Some(event) = events.next().await {
225                        if matches!(event?, LiveEvent::SyncFinished(_)) {
226                            let elapsed = sync_start.elapsed();
227                            info!(
228                                "Synchronisation took {elapsed:?} for {} … ",
229                                crate::fs::util::fmt(namespace_id),
230                            );
231                            break;
232                        }
233                    }
234                }
235            }
236        }
237        replica_sender.send_replace(());
238        Ok(())
239    }
240
241    /// Join a swarm to fetch the latest version of a replica and save it to the local machine.
242    ///
243    /// # Arguments
244    ///
245    /// * `ticket` - A ticket for the replica to fetch.
246    ///
247    /// * `path` - An optional path of requested files within the replica.
248    ///
249    /// # Returns
250    ///
251    /// A handle to the replica.
252    pub async fn fetch_replica_by_ticket(
253        &self,
254        ticket: &DocTicket,
255        path: &Option<PathBuf>,
256        filters: &Option<Vec<FilterKind>>,
257    ) -> anyhow::Result<()> {
258        let namespace_id = ticket.capability.id();
259        let docs_client = &self.docs;
260        let replica_sender = self.replica_sender.clone();
261        match path.clone() {
262            Some(path) => {
263                let replica = docs_client
264                    .import_namespace(ticket.capability.clone())
265                    .await?;
266                let filters = filters
267                    .clone()
268                    .unwrap_or(vec![FilterKind::Prefix(path_to_entry_prefix(&path))]);
269                replica
270                    .set_download_policy(iroh_docs::store::DownloadPolicy::NothingExcept(filters))
271                    .await?;
272                replica.start_sync(ticket.nodes.clone()).await?;
273                let mut events = replica.subscribe().await?;
274                let sync_start = std::time::Instant::now();
275                while let Some(event) = events.next().await {
276                    if matches!(event?, LiveEvent::SyncFinished(_)) {
277                        let elapsed = sync_start.elapsed();
278                        info!(
279                            "Synchronisation took {elapsed:?} for {} … ",
280                            crate::fs::util::fmt(namespace_id),
281                        );
282                        break;
283                    }
284                }
285            }
286            None => {
287                if let Some(replica) = docs_client.open(namespace_id).await.unwrap_or(None) {
288                    replica
289                        .set_download_policy(iroh_docs::store::DownloadPolicy::default())
290                        .await?;
291                    replica.start_sync(ticket.nodes.clone()).await?;
292                    let mut events = replica.subscribe().await?;
293                    let sync_start = std::time::Instant::now();
294                    while let Some(event) = events.next().await {
295                        if matches!(event?, LiveEvent::SyncFinished(_)) {
296                            let elapsed = sync_start.elapsed();
297                            info!(
298                                "Synchronisation took {elapsed:?} for {} … ",
299                                crate::fs::util::fmt(namespace_id),
300                            );
301                            break;
302                        }
303                    }
304                } else {
305                    let (_replica, mut events) =
306                        docs_client.import_and_subscribe(ticket.clone()).await?;
307                    let sync_start = std::time::Instant::now();
308                    while let Some(event) = events.next().await {
309                        if matches!(event?, LiveEvent::SyncFinished(_)) {
310                            let elapsed = sync_start.elapsed();
311                            info!(
312                                "Synchronisation took {elapsed:?} for {} … ",
313                                crate::fs::util::fmt(namespace_id),
314                            );
315                            break;
316                        }
317                    }
318                }
319            }
320        };
321        replica_sender.send_replace(());
322        Ok(())
323    }
324
325    /// Join a swarm to fetch the latest version of a replica and save it to the local machine.
326    ///
327    /// If a version of the replica already exists locally, only the last-fetched paths will be fetched.
328    ///
329    /// # Arguments
330    ///
331    /// * `namespace_id` - The ID of the replica to fetch.
332    pub async fn sync_replica(&self, namespace_id: &NamespaceId) -> anyhow::Result<()> {
333        let ticket = self.resolve_namespace_id(namespace_id).await?;
334        let docs_client = &self.docs;
335        let replica_sender = self.replica_sender.clone();
336        let (_replica, mut events) = docs_client.import_and_subscribe(ticket).await?;
337        let sync_start = std::time::Instant::now();
338        while let Some(event) = events.next().await {
339            if matches!(event?, LiveEvent::SyncFinished(_)) {
340                let elapsed = sync_start.elapsed();
341                info!(
342                    "Synchronisation took {elapsed:?} for {} … ",
343                    crate::fs::util::fmt(namespace_id),
344                );
345                break;
346            }
347        }
348        replica_sender.send_replace(());
349        Ok(())
350    }
351
352    /// Use the mainline DHT to obtain a ticket for the replica with the given ID.
353    ///
354    /// # Arguments
355    ///
356    /// * `namespace_id` - The ID of the replica to fetch.
357    ///
358    /// # Returns
359    ///
360    /// A ticket for the replica with the given ID.
361    pub async fn resolve_namespace_id(
362        &self,
363        namespace_id: &NamespaceId,
364    ) -> anyhow::Result<DocTicket> {
365        let get_stream = self.dht.get_mutable(namespace_id.as_bytes(), None, None);
366        tokio::pin!(get_stream);
367        let mut tickets = Vec::new();
368        while let Some(mutable_item) = get_stream.next().await {
369            let ticket = DocTicket::from_bytes(mutable_item.value())?;
370            let ticket_namespace_id = &ticket.capability.id();
371            if ticket_namespace_id != namespace_id {
372                error!("Ticket is for replica with ID {}, but claims to be for replica with ID {}; ignoring ticket … ", crate::fs::util::fmt(ticket_namespace_id), crate::fs::util::fmt(namespace_id));
373                continue;
374            }
375            if let Err(e) = DATABASE.upsert_announcement(&ReplicaAnnouncement {
376                key: mutable_item.key().to_vec(),
377                signature: mutable_item.signature().to_vec(),
378            }) {
379                error!("{e}");
380            }
381            tickets.push(ticket)
382        }
383        merge_tickets(&tickets).ok_or(anyhow!(
384            "Could not find tickets for {} … ",
385            crate::fs::util::fmt(namespace_id)
386        ))
387    }
388
389    /// Create a sharing ticket for a given replica.
390    ///
391    /// # Arguments
392    ///
393    /// * `namespace_id` - The ID of the replica to share.
394    ///
395    /// * `share_mode` - Whether the replica should be shared as read-only, or if read & write permissions are to be shared.
396    ///
397    /// # Returns
398    ///
399    /// A ticket to retrieve the given replica with the requested permissions.
400    pub async fn create_document_ticket(
401        &self,
402        namespace_id: &NamespaceId,
403        share_mode: &ShareMode,
404    ) -> miette::Result<DocTicket> {
405        if matches!(share_mode, ShareMode::Write)
406            && matches!(
407                self.get_replica_capability(namespace_id).await?,
408                CapabilityKind::Read
409            )
410        {
411            Err(OkuFsError::CannotShareReplicaWritable(*namespace_id).into())
412        } else {
413            let docs_client = &self.docs;
414            let document = docs_client
415                .open(*namespace_id)
416                .await
417                .map_err(|e| {
418                    error!("{}", e);
419                    OkuFsError::CannotOpenReplica
420                })?
421                .ok_or(OkuFsError::FsEntryNotFound)?;
422            Ok(document
423                .share(share_mode.clone(), AddrInfoOptions::RelayAndAddresses)
424                .await
425                .map_err(|e| {
426                    error!("{}", e);
427                    OkuDiscoveryError::CannotGenerateSharingTicket
428                })?)
429        }
430    }
431}