oku_fs/fs/
directory.rs

1use super::*;
2use crate::error::OkuFsError;
3use anyhow::anyhow;
4use bytes::Bytes;
5use futures::{future, pin_mut, StreamExt};
6use iroh_blobs::Hash;
7use iroh_docs::rpc::client::docs::Entry;
8use iroh_docs::store::FilterKind;
9use iroh_docs::DocTicket;
10use iroh_docs::NamespaceId;
11use log::error;
12use miette::IntoDiagnostic;
13use rayon::iter::{
14    IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
15};
16use std::path::Path;
17use std::path::PathBuf;
18use util::entry_key_to_path;
19use util::normalise_path;
20use util::path_to_entry_prefix;
21
22impl OkuFs {
23    /// Reads the contents of the files in a directory.
24    ///
25    /// # Arguments
26    ///
27    /// * `namespace_id` - The ID of the replica containing the folder.
28    ///
29    /// * `path` - The folder whose contents will be read.
30    ///
31    /// # Returns
32    ///
33    /// A list of file entries and the corresponding content as bytes.
34    pub async fn read_directory(
35        &self,
36        namespace_id: &NamespaceId,
37        path: &Path,
38    ) -> miette::Result<Vec<(Entry, Bytes)>> {
39        let entries = self
40            .list_files(namespace_id, &Some(path.to_path_buf()))
41            .await?;
42        let bytes = future::try_join_all(entries.iter().map(|entry| self.content_bytes(entry)))
43            .await
44            .map_err(|e| miette::miette!("{}", e))?;
45        Ok(entries.into_par_iter().zip(bytes.into_par_iter()).collect())
46    }
47
48    /// Moves a directory by copying it to a new location and deleting the original.
49    ///
50    /// # Arguments
51    ///
52    /// * `from_namespace_id` - The ID of the replica containing the directory to move.
53    ///
54    /// * `to_namespace_id` - The ID of the replica to move the directory to.
55    ///
56    /// * `from_path` - The path of the directory to move.
57    ///
58    /// * `to_path` - The path to move the directory to.
59    ///
60    /// # Returns
61    ///
62    /// A tuple containing the list of file hashes for files at their new destinations, and the total number of replica entries deleted during the operation.
63    pub async fn move_directory(
64        &self,
65        from_namespace_id: &NamespaceId,
66        from_path: &Path,
67        to_namespace_id: &NamespaceId,
68        to_path: &Path,
69    ) -> miette::Result<(Vec<Hash>, usize)> {
70        let mut entries_deleted = 0;
71        let mut moved_file_hashes = Vec::new();
72        let old_directory_files = self
73            .list_files(from_namespace_id, &Some(from_path.to_path_buf()))
74            .await?;
75        for old_directory_file in old_directory_files {
76            let old_file_path = entry_key_to_path(old_directory_file.key())?;
77            let new_file_path = to_path.join(old_file_path.file_name().unwrap_or_default());
78            let file_move_info = self
79                .move_file(
80                    from_namespace_id,
81                    &old_file_path,
82                    to_namespace_id,
83                    &new_file_path,
84                )
85                .await?;
86            moved_file_hashes.push(file_move_info.0);
87            entries_deleted += file_move_info.1;
88        }
89        Ok((moved_file_hashes, entries_deleted))
90    }
91
92    /// Deletes a directory and all its contents.
93    ///
94    /// # Arguments
95    ///
96    /// * `namespace_id` - The ID of the replica containing the directory to delete.
97    ///
98    /// * `path` - The path of the directory to delete.
99    ///
100    /// # Returns
101    ///
102    /// The number of entries deleted.
103    pub async fn delete_directory(
104        &self,
105        namespace_id: &NamespaceId,
106        path: &PathBuf,
107    ) -> miette::Result<usize> {
108        let path = normalise_path(path).join(""); // Ensure path ends with a slash
109        let file_key = path_to_entry_prefix(&path);
110        let docs_client = &self.docs.client();
111        let document = docs_client
112            .open(*namespace_id)
113            .await
114            .map_err(|e| {
115                error!("{}", e);
116                OkuFsError::CannotOpenReplica
117            })?
118            .ok_or(OkuFsError::FsEntryNotFound)?;
119        let mut entries_deleted = 0;
120        let query = iroh_docs::store::Query::single_latest_per_key()
121            .key_prefix(file_key)
122            .build();
123        let entries = document.get_many(query).await.map_err(|e| {
124            error!("{}", e);
125            OkuFsError::CannotListFiles
126        })?;
127        pin_mut!(entries);
128        let files: Vec<Entry> = entries.map(|entry| entry.unwrap()).collect().await;
129        for file in files {
130            entries_deleted += document
131                .del(
132                    file.author(),
133                    (std::str::from_utf8(&path_to_entry_prefix(&entry_key_to_path(file.key())?))
134                        .into_diagnostic()?)
135                    .to_string(),
136                )
137                .await
138                .map_err(|e| {
139                    error!("{}", e);
140                    OkuFsError::CannotDeleteDirectory
141                })?;
142        }
143        Ok(entries_deleted)
144    }
145
146    /// Determines the oldest timestamp of a file entry in a folder.
147    ///
148    /// # Arguments
149    ///
150    /// * `namespace_id` - The ID of the replica containing the folder.
151    ///
152    /// * `path` - The folder whose oldest timestamp is to be determined.
153    ///
154    /// # Returns
155    ///
156    /// The oldest timestamp of any file descending from this folder, in microseconds from the Unix epoch.
157    pub async fn get_oldest_timestamp_in_folder(
158        &self,
159        namespace_id: &NamespaceId,
160        path: &Path,
161    ) -> miette::Result<u64> {
162        let files = self
163            .list_files(namespace_id, &Some(path.to_path_buf()))
164            .await?;
165        let mut timestamps: Vec<u64> = Vec::new();
166        for file in files {
167            timestamps.push(
168                self.get_oldest_entry_timestamp(namespace_id, &entry_key_to_path(file.key())?)
169                    .await?,
170            );
171        }
172        Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
173    }
174
175    /// Determines the latest timestamp of a file entry in a folder.
176    ///
177    /// # Arguments
178    ///
179    /// * `namespace_id` - The ID of the replica containing the folder.
180    ///
181    /// * `path` - The folder whose latest timestamp is to be determined.
182    ///
183    /// # Returns
184    ///
185    /// The latest timestamp of any file descending from this folder, in microseconds from the Unix epoch.
186    pub async fn get_newest_timestamp_in_folder(
187        &self,
188        namespace_id: &NamespaceId,
189        path: &Path,
190    ) -> miette::Result<u64> {
191        let files = self
192            .list_files(namespace_id, &Some(path.to_path_buf()))
193            .await?;
194        let mut timestamps: Vec<u64> = Vec::new();
195        for file in files {
196            timestamps.push(file.timestamp());
197        }
198        Ok(*timestamps.par_iter().max().unwrap_or(&u64::MIN))
199    }
200
201    /// Determines the size of a folder.
202    ///
203    /// # Arguments
204    ///
205    /// * `namespace_id` - The ID of the replica containing the folder.
206    ///
207    /// * `path` - The path to the folder within the replica.
208    ///
209    /// # Returns
210    ///
211    /// The total size, in bytes, of the files descending from this folder.
212    pub async fn get_folder_size(
213        &self,
214        namespace_id: &NamespaceId,
215        path: &Path,
216    ) -> miette::Result<u64> {
217        let files = self
218            .list_files(namespace_id, &Some(path.to_path_buf()))
219            .await?;
220        let mut size = 0;
221        for file in files {
222            size += file.content_len();
223        }
224        Ok(size)
225    }
226
227    /// Join a swarm to fetch the latest version of a directory and save it to the local machine.
228    ///
229    /// # Arguments
230    ///
231    /// * `ticket` - A ticket for the replica containing the directory to retrieve.
232    ///
233    /// * `path` - The path to the directory to retrieve.
234    ///
235    /// # Returns
236    ///
237    /// The content of the files in the directory.
238    pub async fn fetch_directory_with_ticket(
239        &self,
240        ticket: &DocTicket,
241        path: &Path,
242        filters: &Option<Vec<FilterKind>>,
243    ) -> anyhow::Result<Vec<(Entry, Bytes)>> {
244        self.fetch_replica_by_ticket(ticket, &Some(path.to_path_buf()), filters)
245            .await?;
246        self.read_directory(&ticket.capability.id(), path)
247            .await
248            .map_err(|e| anyhow!("{}", e))
249    }
250}