1use super::*;
2use crate::error::OkuFsError;
3use anyhow::anyhow;
4use bytes::Bytes;
5use futures::{future, pin_mut, StreamExt};
6use iroh_blobs::Hash;
7use iroh_docs::rpc::client::docs::Entry;
8use iroh_docs::store::FilterKind;
9use iroh_docs::DocTicket;
10use iroh_docs::NamespaceId;
11use log::error;
12use miette::IntoDiagnostic;
13use rayon::iter::{
14 IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
15};
16use std::path::Path;
17use std::path::PathBuf;
18use util::entry_key_to_path;
19use util::normalise_path;
20use util::path_to_entry_prefix;
21
22impl OkuFs {
23 pub async fn read_directory(
35 &self,
36 namespace_id: &NamespaceId,
37 path: &Path,
38 ) -> miette::Result<Vec<(Entry, Bytes)>> {
39 let entries = self
40 .list_files(namespace_id, &Some(path.to_path_buf()))
41 .await?;
42 let bytes = future::try_join_all(entries.iter().map(|entry| self.content_bytes(entry)))
43 .await
44 .map_err(|e| miette::miette!("{}", e))?;
45 Ok(entries.into_par_iter().zip(bytes.into_par_iter()).collect())
46 }
47
48 pub async fn move_directory(
64 &self,
65 from_namespace_id: &NamespaceId,
66 from_path: &Path,
67 to_namespace_id: &NamespaceId,
68 to_path: &Path,
69 ) -> miette::Result<(Vec<Hash>, usize)> {
70 let mut entries_deleted = 0;
71 let mut moved_file_hashes = Vec::new();
72 let old_directory_files = self
73 .list_files(from_namespace_id, &Some(from_path.to_path_buf()))
74 .await?;
75 for old_directory_file in old_directory_files {
76 let old_file_path = entry_key_to_path(old_directory_file.key())?;
77 let new_file_path = to_path.join(old_file_path.file_name().unwrap_or_default());
78 let file_move_info = self
79 .move_file(
80 from_namespace_id,
81 &old_file_path,
82 to_namespace_id,
83 &new_file_path,
84 )
85 .await?;
86 moved_file_hashes.push(file_move_info.0);
87 entries_deleted += file_move_info.1;
88 }
89 Ok((moved_file_hashes, entries_deleted))
90 }
91
92 pub async fn delete_directory(
104 &self,
105 namespace_id: &NamespaceId,
106 path: &PathBuf,
107 ) -> miette::Result<usize> {
108 let path = normalise_path(path).join(""); let file_key = path_to_entry_prefix(&path);
110 let docs_client = &self.docs.client();
111 let document = docs_client
112 .open(*namespace_id)
113 .await
114 .map_err(|e| {
115 error!("{}", e);
116 OkuFsError::CannotOpenReplica
117 })?
118 .ok_or(OkuFsError::FsEntryNotFound)?;
119 let mut entries_deleted = 0;
120 let query = iroh_docs::store::Query::single_latest_per_key()
121 .key_prefix(file_key)
122 .build();
123 let entries = document.get_many(query).await.map_err(|e| {
124 error!("{}", e);
125 OkuFsError::CannotListFiles
126 })?;
127 pin_mut!(entries);
128 let files: Vec<Entry> = entries.map(|entry| entry.unwrap()).collect().await;
129 for file in files {
130 entries_deleted += document
131 .del(
132 file.author(),
133 (std::str::from_utf8(&path_to_entry_prefix(&entry_key_to_path(file.key())?))
134 .into_diagnostic()?)
135 .to_string(),
136 )
137 .await
138 .map_err(|e| {
139 error!("{}", e);
140 OkuFsError::CannotDeleteDirectory
141 })?;
142 }
143 Ok(entries_deleted)
144 }
145
146 pub async fn get_oldest_timestamp_in_folder(
158 &self,
159 namespace_id: &NamespaceId,
160 path: &Path,
161 ) -> miette::Result<u64> {
162 let files = self
163 .list_files(namespace_id, &Some(path.to_path_buf()))
164 .await?;
165 let mut timestamps: Vec<u64> = Vec::new();
166 for file in files {
167 timestamps.push(
168 self.get_oldest_entry_timestamp(namespace_id, &entry_key_to_path(file.key())?)
169 .await?,
170 );
171 }
172 Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
173 }
174
175 pub async fn get_newest_timestamp_in_folder(
187 &self,
188 namespace_id: &NamespaceId,
189 path: &Path,
190 ) -> miette::Result<u64> {
191 let files = self
192 .list_files(namespace_id, &Some(path.to_path_buf()))
193 .await?;
194 let mut timestamps: Vec<u64> = Vec::new();
195 for file in files {
196 timestamps.push(file.timestamp());
197 }
198 Ok(*timestamps.par_iter().max().unwrap_or(&u64::MIN))
199 }
200
201 pub async fn get_folder_size(
213 &self,
214 namespace_id: &NamespaceId,
215 path: &Path,
216 ) -> miette::Result<u64> {
217 let files = self
218 .list_files(namespace_id, &Some(path.to_path_buf()))
219 .await?;
220 let mut size = 0;
221 for file in files {
222 size += file.content_len();
223 }
224 Ok(size)
225 }
226
227 pub async fn fetch_directory_with_ticket(
239 &self,
240 ticket: &DocTicket,
241 path: &Path,
242 filters: &Option<Vec<FilterKind>>,
243 ) -> anyhow::Result<Vec<(Entry, Bytes)>> {
244 self.fetch_replica_by_ticket(ticket, &Some(path.to_path_buf()), filters)
245 .await?;
246 self.read_directory(&ticket.capability.id(), path)
247 .await
248 .map_err(|e| anyhow!("{}", e))
249 }
250}