oku_fs/fs/
file.rs

1use super::*;
2use crate::error::OkuFsError;
3use anyhow::anyhow;
4use bytes::Bytes;
5use futures::{pin_mut, StreamExt};
6use iroh_blobs::Hash;
7use iroh_docs::engine::LiveEvent;
8use iroh_docs::rpc::client::docs::Doc;
9use iroh_docs::rpc::client::docs::Entry;
10use iroh_docs::store::FilterKind;
11use iroh_docs::DocTicket;
12use iroh_docs::NamespaceId;
13use log::{error, info};
14use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
15use std::path::PathBuf;
16use util::path_to_entry_key;
17use util::path_to_entry_prefix;
18
19impl OkuFs {
20    /// Lists files in a replica.
21    ///
22    /// # Arguments
23    ///
24    /// * `namespace_id` - The ID of the replica to list files in.
25    ///
26    /// * `path` - An optional path within the replica.
27    ///
28    /// # Returns
29    ///
30    /// A list of files in the replica.
31    pub async fn list_files(
32        &self,
33        namespace_id: &NamespaceId,
34        path: &Option<PathBuf>,
35    ) -> miette::Result<Vec<Entry>> {
36        let docs_client = &self.docs.client();
37        let document = docs_client
38            .open(*namespace_id)
39            .await
40            .map_err(|e| {
41                error!("{}", e);
42                OkuFsError::CannotOpenReplica
43            })?
44            .ok_or(OkuFsError::FsEntryNotFound)?;
45        let query = if let Some(path) = path {
46            let file_key = path_to_entry_prefix(path);
47            iroh_docs::store::Query::single_latest_per_key()
48                .key_prefix(file_key)
49                .build()
50        } else {
51            iroh_docs::store::Query::single_latest_per_key().build()
52        };
53        let entries = document.get_many(query).await.map_err(|e| {
54            error!("{}", e);
55            OkuFsError::CannotListFiles
56        })?;
57        pin_mut!(entries);
58        let files: Vec<Entry> = entries.map(|entry| entry.unwrap()).collect().await;
59        Ok(files)
60    }
61
62    /// Creates a file (if it does not exist) or modifies an existing file.
63    ///
64    /// # Arguments
65    ///
66    /// * `namespace_id` - The ID of the replica containing the file to create or modify.
67    ///
68    /// * `path` - The path of the file to create or modify.
69    ///
70    /// * `data` - The data to write to the file.
71    ///
72    /// # Returns
73    ///
74    /// The hash of the file.
75    pub async fn create_or_modify_file(
76        &self,
77        namespace_id: &NamespaceId,
78        path: &PathBuf,
79        data: impl Into<Bytes>,
80    ) -> miette::Result<Hash> {
81        let file_key = path_to_entry_key(path);
82        let docs_client = &self.docs.client();
83        let document = docs_client
84            .open(*namespace_id)
85            .await
86            .map_err(|e| {
87                error!("{}", e);
88                OkuFsError::CannotOpenReplica
89            })?
90            .ok_or(OkuFsError::FsEntryNotFound)?;
91        let entry_hash = document
92            .set_bytes(self.default_author().await, file_key, data)
93            .await
94            .map_err(|e| {
95                error!("{}", e);
96                OkuFsError::CannotCreateOrModifyFile
97            })?;
98
99        Ok(entry_hash)
100    }
101
102    /// Deletes a file.
103    ///
104    /// # Arguments
105    ///
106    /// * `namespace_id` - The ID of the replica containing the file to delete.
107    ///
108    /// * `path` - The path of the file to delete.
109    ///
110    /// # Returns
111    ///
112    /// The number of entries deleted in the replica, which should be 1 if the file was successfully deleted.
113    pub async fn delete_file(
114        &self,
115        namespace_id: &NamespaceId,
116        path: &PathBuf,
117    ) -> miette::Result<usize> {
118        let file_key = path_to_entry_key(path);
119        let docs_client = &self.docs.client();
120        let document = docs_client
121            .open(*namespace_id)
122            .await
123            .map_err(|e| {
124                error!("{}", e);
125                OkuFsError::CannotOpenReplica
126            })?
127            .ok_or(OkuFsError::FsEntryNotFound)?;
128        let query = iroh_docs::store::Query::single_latest_per_key()
129            .key_exact(file_key)
130            .build();
131        let entry = document
132            .get_one(query)
133            .await
134            .map_err(|e| {
135                error!("{}", e);
136                OkuFsError::CannotReadFile
137            })?
138            .ok_or(OkuFsError::FsEntryNotFound)?;
139        let entries_deleted = document
140            .del(entry.author(), entry.key().to_vec())
141            .await
142            .map_err(|e| {
143                error!("{}", e);
144                OkuFsError::CannotDeleteFile
145            })?;
146        Ok(entries_deleted)
147    }
148
149    /// Gets an Iroh entry for a file.
150    ///
151    /// # Arguments
152    ///
153    /// * `namespace_id` - The ID of the replica containing the file.
154    ///
155    /// * `path` - The path of the file.
156    ///
157    /// # Returns
158    ///
159    /// The entry representing the file.
160    pub async fn get_entry(
161        &self,
162        namespace_id: &NamespaceId,
163        path: &PathBuf,
164    ) -> miette::Result<Entry> {
165        let file_key = path_to_entry_key(path);
166        let docs_client = &self.docs.client();
167        let document = docs_client
168            .open(*namespace_id)
169            .await
170            .map_err(|e| {
171                error!("{}", e);
172                OkuFsError::CannotOpenReplica
173            })?
174            .ok_or(OkuFsError::FsEntryNotFound)?;
175        let query = iroh_docs::store::Query::single_latest_per_key()
176            .key_exact(file_key)
177            .build();
178        let entry = document
179            .get_one(query)
180            .await
181            .map_err(|e| {
182                error!("{}", e);
183                OkuFsError::CannotReadFile
184            })?
185            .ok_or(OkuFsError::FsEntryNotFound)?;
186        Ok(entry)
187    }
188
189    /// Determines the oldest timestamp of a file.
190    ///
191    /// # Arguments
192    ///
193    /// * `namespace_id` - The ID of the replica containing the file.
194    ///
195    /// * `path` - The path to the file.
196    ///
197    /// # Returns
198    ///
199    /// The timestamp, in microseconds from the Unix epoch, of the oldest entry in the file.
200    pub async fn get_oldest_entry_timestamp(
201        &self,
202        namespace_id: &NamespaceId,
203        path: &PathBuf,
204    ) -> miette::Result<u64> {
205        let file_key = path_to_entry_key(path);
206        let docs_client = &self.docs.client();
207        let document = docs_client
208            .open(*namespace_id)
209            .await
210            .map_err(|e| {
211                error!("{}", e);
212                OkuFsError::CannotOpenReplica
213            })?
214            .ok_or(OkuFsError::FsEntryNotFound)?;
215        let query = iroh_docs::store::Query::all().key_exact(file_key).build();
216        let entries = document.get_many(query).await.map_err(|e| {
217            error!("{}", e);
218            OkuFsError::CannotListFiles
219        })?;
220        pin_mut!(entries);
221        let timestamps: Vec<u64> = entries
222            .map(|entry| entry.unwrap().timestamp())
223            .collect()
224            .await;
225        Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
226    }
227
228    /// Reads a file.
229    ///
230    /// # Arguments
231    ///
232    /// * `namespace_id` - The ID of the replica containing the file to read.
233    ///
234    /// * `path` - The path of the file to read.
235    ///
236    /// # Returns
237    ///
238    /// The data read from the file.
239    pub async fn read_file(
240        &self,
241        namespace_id: &NamespaceId,
242        path: &PathBuf,
243    ) -> miette::Result<Bytes> {
244        let entry = self.get_entry(namespace_id, path).await?;
245        Ok(self.content_bytes(&entry).await.map_err(|e| {
246            error!("{}", e);
247            OkuFsError::CannotReadFile
248        })?)
249    }
250
251    /// Reads a file.
252    ///
253    /// # Arguments
254    ///
255    /// * `document` - A handle to the replica containing the file to read.
256    ///
257    /// * `path` - The path of the file to read.
258    ///
259    /// # Returns
260    ///
261    /// The data read from the file.
262    pub async fn read_file_from_replica_handle(
263        &self,
264        document: &Doc,
265        path: &PathBuf,
266    ) -> miette::Result<Bytes> {
267        let file_key = path_to_entry_key(path);
268        let query = iroh_docs::store::Query::single_latest_per_key()
269            .key_exact(file_key)
270            .build();
271        let entry = document
272            .get_one(query)
273            .await
274            .map_err(|e| miette::miette!("{}", e))?
275            .ok_or(OkuFsError::FsEntryNotFound)?;
276        self.content_bytes(&entry)
277            .await
278            .map_err(|e| miette::miette!("{}", e))
279    }
280
281    /// Moves a file by copying it to a new location and deleting the original.
282    ///
283    /// # Arguments
284    ///
285    /// * `from_namespace_id` - The ID of the replica containing the file to move.
286    ///
287    /// * `to_namespace_id` - The ID of the replica to move the file to.
288    ///
289    /// * `from_path` - The path of the file to move.
290    ///
291    /// * `to_path` - The path to move the file to.
292    ///
293    /// # Returns
294    ///
295    /// A tuple containing the hash of the file at the new destination and the number of replica entries deleted during the operation, which should be 1 if the file at the original path was deleted.
296    pub async fn move_file(
297        &self,
298        from_namespace_id: &NamespaceId,
299        from_path: &PathBuf,
300        to_namespace_id: &NamespaceId,
301        to_path: &PathBuf,
302    ) -> miette::Result<(Hash, usize)> {
303        let data = self.read_file(from_namespace_id, from_path).await?;
304        let hash = self
305            .create_or_modify_file(to_namespace_id, to_path, data)
306            .await?;
307        let entries_deleted = self.delete_file(from_namespace_id, from_path).await?;
308        Ok((hash, entries_deleted))
309    }
310
311    /// Retrieve a file locally after attempting to retrieve the latest version from the Internet.
312    ///
313    /// # Arguments
314    ///
315    /// * `namespace_id` - The ID of the replica containing the file to retrieve.
316    ///
317    /// * `path` - The path to the file to retrieve.
318    ///
319    /// # Returns
320    ///
321    /// The data read from the file.
322    pub async fn fetch_file(
323        &self,
324        namespace_id: &NamespaceId,
325        path: &PathBuf,
326        filters: &Option<Vec<FilterKind>>,
327    ) -> anyhow::Result<Bytes> {
328        match self.resolve_namespace_id(namespace_id).await {
329            Ok(ticket) => match self.fetch_file_with_ticket(&ticket, path, filters).await {
330                Ok(bytes) => Ok(bytes),
331                Err(e) => {
332                    error!("{}", e);
333                    Ok(self
334                        .read_file(namespace_id, path)
335                        .await
336                        .map_err(|e| anyhow!("{}", e))?)
337                }
338            },
339            Err(e) => {
340                error!("{}", e);
341                Ok(self
342                    .read_file(namespace_id, path)
343                    .await
344                    .map_err(|e| anyhow!("{}", e))?)
345            }
346        }
347    }
348
349    /// Join a swarm to fetch the latest version of a file and save it to the local machine.
350    ///
351    /// # Arguments
352    ///
353    /// * `ticket` - A ticket for the replica containing the file to retrieve.
354    ///
355    /// * `path` - The path to the file to retrieve.
356    ///
357    /// # Returns
358    ///
359    /// The data read from the file.
360    pub async fn fetch_file_with_ticket(
361        &self,
362        ticket: &DocTicket,
363        path: &PathBuf,
364        filters: &Option<Vec<FilterKind>>,
365    ) -> anyhow::Result<Bytes> {
366        let docs_client = &self.docs.client();
367        let replica = docs_client
368            .import_namespace(ticket.capability.clone())
369            .await?;
370        let filters = filters
371            .clone()
372            .unwrap_or(vec![FilterKind::Exact(path_to_entry_key(path))]);
373        replica
374            .set_download_policy(iroh_docs::store::DownloadPolicy::NothingExcept(filters))
375            .await?;
376        replica.start_sync(ticket.nodes.clone()).await?;
377        let namespace_id = ticket.capability.id();
378        let mut events = replica.subscribe().await?;
379        let sync_start = std::time::Instant::now();
380        while let Some(event) = events.next().await {
381            if matches!(event?, LiveEvent::SyncFinished(_)) {
382                let elapsed = sync_start.elapsed();
383                info!(
384                    "Synchronisation took {elapsed:?} for {} … ",
385                    crate::fs::util::fmt(namespace_id),
386                );
387                break;
388            }
389        }
390        self.read_file(&namespace_id, path)
391            .await
392            .map_err(|e| anyhow!("{}", e))
393    }
394}