oku_fs/fs/
replica.rs

1use super::*;
2use crate::config::OkuFsConfig;
3use crate::database::core::DATABASE;
4use crate::database::dht::ReplicaAnnouncement;
5use crate::error::{OkuDiscoveryError, OkuFsError, OkuFuseError};
6use anyhow::anyhow;
7use futures::{pin_mut, StreamExt};
8use iroh_base::ticket::Ticket;
9use iroh_docs::engine::LiveEvent;
10use iroh_docs::rpc::client::docs::ShareMode;
11use iroh_docs::rpc::AddrInfoOptions;
12use iroh_docs::store::FilterKind;
13use iroh_docs::sync::CapabilityKind;
14use iroh_docs::DocTicket;
15use iroh_docs::NamespaceId;
16use log::{error, info};
17use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
18use std::path::PathBuf;
19use util::{merge_tickets, path_to_entry_prefix};
20
21impl OkuFs {
22    /// Creates a new replica in the file system.
23    ///
24    /// # Returns
25    ///
26    /// The ID of the new replica, being its public key.
27    pub async fn create_replica(&self) -> miette::Result<NamespaceId> {
28        let docs_client = &self.docs.client();
29        let new_document = docs_client.create().await.map_err(|e| {
30            error!("{}", e);
31            OkuFsError::CannotCreateReplica
32        })?;
33        let document_id = new_document.id();
34        new_document.close().await.map_err(|e| {
35            error!("{}", e);
36            OkuFsError::CannotExitReplica
37        })?;
38        self.replica_sender.send_replace(());
39        Ok(document_id)
40    }
41
42    /// Deletes a replica from the file system.
43    ///
44    /// # Arguments
45    ///
46    /// * `namespace_id` - The ID of the replica to delete.
47    pub async fn delete_replica(&self, namespace_id: &NamespaceId) -> miette::Result<()> {
48        let docs_client = &self.docs.client();
49        self.replica_sender.send_replace(());
50        Ok(docs_client.drop_doc(*namespace_id).await.map_err(|e| {
51            error!("{}", e);
52            OkuFsError::CannotDeleteReplica
53        })?)
54    }
55
56    /// Lists all replicas in the file system.
57    ///
58    /// # Returns
59    ///
60    /// A list of all replicas in the file system.
61    pub async fn list_replicas(&self) -> miette::Result<Vec<(NamespaceId, CapabilityKind)>> {
62        let docs_client = &self.docs.client();
63        let replicas = docs_client.list().await.map_err(|e| {
64            error!("{}", e);
65            OkuFsError::CannotListReplicas
66        })?;
67        pin_mut!(replicas);
68        let mut replica_ids: Vec<(NamespaceId, CapabilityKind)> = replicas
69            .filter_map(|replica| async move { replica.ok() })
70            .collect()
71            .await;
72
73        let config = OkuFsConfig::load_or_create_config()?;
74        if let Some(home_replica) = config.home_replica()? {
75            replica_ids.sort_unstable_by_key(|(namespace_id, capability_kind)| {
76                (
77                    *namespace_id != home_replica,
78                    !matches!(capability_kind, CapabilityKind::Write),
79                )
80            });
81        } else {
82            replica_ids.sort_unstable_by_key(|(_namespace_id, capability_kind)| {
83                !matches!(capability_kind, CapabilityKind::Write)
84            });
85        }
86        Ok(replica_ids)
87    }
88
89    /// Retrieves the permissions for a local replica.
90    ///
91    /// # Arguments
92    ///
93    /// * `namespace_id` - The ID of the replica.
94    ///
95    /// # Returns
96    ///
97    /// If either the replica can be read from & written to, or if it can only be read from.
98    pub async fn get_replica_capability(
99        &self,
100        namespace_id: &NamespaceId,
101    ) -> miette::Result<CapabilityKind> {
102        let replicas_vec = self.list_replicas().await?;
103        match replicas_vec
104            .par_iter()
105            .find_any(|replica| replica.0 == *namespace_id)
106        {
107            Some(replica) => Ok(replica.1),
108            None => Err(OkuFuseError::NoReplica(crate::fs::util::fmt(namespace_id)).into()),
109        }
110    }
111
112    /// Join a swarm to fetch the latest version of a replica and save it to the local machine.
113    ///
114    /// # Arguments
115    ///
116    /// * `namespace_id` - The ID of the replica to fetch.
117    ///
118    /// * `path` - An optional path of requested files within the replica.
119    pub async fn fetch_replica_by_id(
120        &self,
121        namespace_id: &NamespaceId,
122        path: &Option<PathBuf>,
123    ) -> anyhow::Result<()> {
124        let ticket = self.resolve_namespace_id(namespace_id).await?;
125        let docs_client = self.docs.client();
126        let replica_sender = self.replica_sender.clone();
127        match path.clone() {
128            Some(path) => {
129                let replica = docs_client.import_namespace(ticket.capability).await?;
130                let filter = FilterKind::Prefix(path_to_entry_prefix(&path));
131                replica
132                    .set_download_policy(iroh_docs::store::DownloadPolicy::NothingExcept(vec![
133                        filter,
134                    ]))
135                    .await?;
136                replica.start_sync(ticket.nodes).await?;
137                let mut events = replica.subscribe().await?;
138                let sync_start = std::time::Instant::now();
139                while let Some(event) = events.next().await {
140                    if matches!(event?, LiveEvent::SyncFinished(_)) {
141                        let elapsed = sync_start.elapsed();
142                        info!(
143                            "Synchronisation took {elapsed:?} for {} … ",
144                            crate::fs::util::fmt(namespace_id),
145                        );
146                        break;
147                    }
148                }
149            }
150            None => {
151                if let Some(replica) = docs_client.open(*namespace_id).await.unwrap_or(None) {
152                    replica
153                        .set_download_policy(iroh_docs::store::DownloadPolicy::default())
154                        .await?;
155                    replica.start_sync(ticket.nodes).await?;
156                    let mut events = replica.subscribe().await?;
157                    let sync_start = std::time::Instant::now();
158                    while let Some(event) = events.next().await {
159                        if matches!(event?, LiveEvent::SyncFinished(_)) {
160                            let elapsed = sync_start.elapsed();
161                            info!(
162                                "Synchronisation took {elapsed:?} for {} … ",
163                                crate::fs::util::fmt(namespace_id),
164                            );
165                            break;
166                        }
167                    }
168                } else {
169                    let (_replica, mut events) = docs_client.import_and_subscribe(ticket).await?;
170                    let sync_start = std::time::Instant::now();
171                    while let Some(event) = events.next().await {
172                        if matches!(event?, LiveEvent::SyncFinished(_)) {
173                            let elapsed = sync_start.elapsed();
174                            info!(
175                                "Synchronisation took {elapsed:?} for {} … ",
176                                crate::fs::util::fmt(namespace_id),
177                            );
178                            break;
179                        }
180                    }
181                }
182            }
183        }
184        replica_sender.send_replace(());
185        Ok(())
186    }
187
188    /// Join a swarm to fetch the latest version of a replica and save it to the local machine.
189    ///
190    /// # Arguments
191    ///
192    /// * `ticket` - A ticket for the replica to fetch.
193    ///
194    /// * `path` - An optional path of requested files within the replica.
195    ///
196    /// # Returns
197    ///
198    /// A handle to the replica.
199    pub async fn fetch_replica_by_ticket(
200        &self,
201        ticket: &DocTicket,
202        path: &Option<PathBuf>,
203        filters: &Option<Vec<FilterKind>>,
204    ) -> anyhow::Result<()> {
205        let namespace_id = ticket.capability.id();
206        let docs_client = self.docs.client();
207        let replica_sender = self.replica_sender.clone();
208        match path.clone() {
209            Some(path) => {
210                let replica = docs_client
211                    .import_namespace(ticket.capability.clone())
212                    .await?;
213                let filters = filters
214                    .clone()
215                    .unwrap_or(vec![FilterKind::Prefix(path_to_entry_prefix(&path))]);
216                replica
217                    .set_download_policy(iroh_docs::store::DownloadPolicy::NothingExcept(filters))
218                    .await?;
219                replica.start_sync(ticket.nodes.clone()).await?;
220                let mut events = replica.subscribe().await?;
221                let sync_start = std::time::Instant::now();
222                while let Some(event) = events.next().await {
223                    if matches!(event?, LiveEvent::SyncFinished(_)) {
224                        let elapsed = sync_start.elapsed();
225                        info!(
226                            "Synchronisation took {elapsed:?} for {} … ",
227                            crate::fs::util::fmt(namespace_id),
228                        );
229                        break;
230                    }
231                }
232            }
233            None => {
234                if let Some(replica) = docs_client.open(namespace_id).await.unwrap_or(None) {
235                    replica
236                        .set_download_policy(iroh_docs::store::DownloadPolicy::default())
237                        .await?;
238                    replica.start_sync(ticket.nodes.clone()).await?;
239                    let mut events = replica.subscribe().await?;
240                    let sync_start = std::time::Instant::now();
241                    while let Some(event) = events.next().await {
242                        if matches!(event?, LiveEvent::SyncFinished(_)) {
243                            let elapsed = sync_start.elapsed();
244                            info!(
245                                "Synchronisation took {elapsed:?} for {} … ",
246                                crate::fs::util::fmt(namespace_id),
247                            );
248                            break;
249                        }
250                    }
251                } else {
252                    let (_replica, mut events) =
253                        docs_client.import_and_subscribe(ticket.clone()).await?;
254                    let sync_start = std::time::Instant::now();
255                    while let Some(event) = events.next().await {
256                        if matches!(event?, LiveEvent::SyncFinished(_)) {
257                            let elapsed = sync_start.elapsed();
258                            info!(
259                                "Synchronisation took {elapsed:?} for {} … ",
260                                crate::fs::util::fmt(namespace_id),
261                            );
262                            break;
263                        }
264                    }
265                }
266            }
267        };
268        replica_sender.send_replace(());
269        Ok(())
270    }
271
272    /// Join a swarm to fetch the latest version of a replica and save it to the local machine.
273    ///
274    /// If a version of the replica already exists locally, only the last-fetched paths will be fetched.
275    ///
276    /// # Arguments
277    ///
278    /// * `namespace_id` - The ID of the replica to fetch.
279    pub async fn sync_replica(&self, namespace_id: &NamespaceId) -> anyhow::Result<()> {
280        let ticket = self.resolve_namespace_id(namespace_id).await?;
281        let docs_client = self.docs.client();
282        let replica_sender = self.replica_sender.clone();
283        let (_replica, mut events) = docs_client.import_and_subscribe(ticket).await?;
284        let sync_start = std::time::Instant::now();
285        while let Some(event) = events.next().await {
286            if matches!(event?, LiveEvent::SyncFinished(_)) {
287                let elapsed = sync_start.elapsed();
288                info!(
289                    "Synchronisation took {elapsed:?} for {} … ",
290                    crate::fs::util::fmt(namespace_id),
291                );
292                break;
293            }
294        }
295        replica_sender.send_replace(());
296        Ok(())
297    }
298
299    /// Use the mainline DHT to obtain a ticket for the replica with the given ID.
300    ///
301    /// # Arguments
302    ///
303    /// * `namespace_id` - The ID of the replica to fetch.
304    ///
305    /// # Returns
306    ///
307    /// A ticket for the replica with the given ID.
308    pub async fn resolve_namespace_id(
309        &self,
310        namespace_id: &NamespaceId,
311    ) -> anyhow::Result<DocTicket> {
312        let get_stream = self.dht.get_mutable(namespace_id.as_bytes(), None, None)?;
313        tokio::pin!(get_stream);
314        let mut tickets = Vec::new();
315        while let Some(mutable_item) = get_stream.next().await {
316            let _ = DATABASE.upsert_announcement(&ReplicaAnnouncement {
317                key: mutable_item.key().to_vec(),
318                signature: mutable_item.signature().to_vec(),
319            });
320            tickets.push(DocTicket::from_bytes(mutable_item.value())?)
321        }
322        merge_tickets(&tickets).ok_or(anyhow!(
323            "Could not find tickets for {} … ",
324            crate::fs::util::fmt(namespace_id)
325        ))
326    }
327
328    /// Create a sharing ticket for a given replica.
329    ///
330    /// # Arguments
331    ///
332    /// * `namespace_id` - The ID of the replica to share.
333    ///
334    /// * `share_mode` - Whether the replica should be shared as read-only, or if read & write permissions are to be shared.
335    ///
336    /// # Returns
337    ///
338    /// A ticket to retrieve the given replica with the requested permissions.
339    pub async fn create_document_ticket(
340        &self,
341        namespace_id: &NamespaceId,
342        share_mode: &ShareMode,
343    ) -> miette::Result<DocTicket> {
344        if matches!(share_mode, ShareMode::Write)
345            && matches!(
346                self.get_replica_capability(namespace_id).await?,
347                CapabilityKind::Read
348            )
349        {
350            Err(OkuFsError::CannotShareReplicaWritable(*namespace_id).into())
351        } else {
352            let docs_client = &self.docs.client();
353            let document = docs_client
354                .open(*namespace_id)
355                .await
356                .map_err(|e| {
357                    error!("{}", e);
358                    OkuFsError::CannotOpenReplica
359                })?
360                .ok_or(OkuFsError::FsEntryNotFound)?;
361            Ok(document
362                .share(share_mode.clone(), AddrInfoOptions::RelayAndAddresses)
363                .await
364                .map_err(|e| {
365                    error!("{}", e);
366                    OkuDiscoveryError::CannotGenerateSharingTicket
367                })?)
368        }
369    }
370}