1use super::*;
2use crate::error::OkuFsError;
3use anyhow::anyhow;
4use bytes::Bytes;
5use futures::{pin_mut, StreamExt};
6use iroh_blobs::Hash;
7use iroh_docs::engine::LiveEvent;
8use iroh_docs::rpc::client::docs::Doc;
9use iroh_docs::rpc::client::docs::Entry;
10use iroh_docs::store::FilterKind;
11use iroh_docs::DocTicket;
12use iroh_docs::NamespaceId;
13use log::{error, info};
14use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
15use std::path::PathBuf;
16use util::path_to_entry_key;
17use util::path_to_entry_prefix;
18
19impl OkuFs {
20 pub async fn list_files(
32 &self,
33 namespace_id: &NamespaceId,
34 path: &Option<PathBuf>,
35 ) -> miette::Result<Vec<Entry>> {
36 let docs_client = &self.docs.client();
37 let document = docs_client
38 .open(*namespace_id)
39 .await
40 .map_err(|e| {
41 error!("{}", e);
42 OkuFsError::CannotOpenReplica
43 })?
44 .ok_or(OkuFsError::FsEntryNotFound)?;
45 let query = if let Some(path) = path {
46 let file_key = path_to_entry_prefix(path);
47 iroh_docs::store::Query::single_latest_per_key()
48 .key_prefix(file_key)
49 .build()
50 } else {
51 iroh_docs::store::Query::single_latest_per_key().build()
52 };
53 let entries = document.get_many(query).await.map_err(|e| {
54 error!("{}", e);
55 OkuFsError::CannotListFiles
56 })?;
57 pin_mut!(entries);
58 let files: Vec<Entry> = entries.map(|entry| entry.unwrap()).collect().await;
59 Ok(files)
60 }
61
62 pub async fn create_or_modify_file(
76 &self,
77 namespace_id: &NamespaceId,
78 path: &PathBuf,
79 data: impl Into<Bytes>,
80 ) -> miette::Result<Hash> {
81 let file_key = path_to_entry_key(path);
82 let docs_client = &self.docs.client();
83 let document = docs_client
84 .open(*namespace_id)
85 .await
86 .map_err(|e| {
87 error!("{}", e);
88 OkuFsError::CannotOpenReplica
89 })?
90 .ok_or(OkuFsError::FsEntryNotFound)?;
91 let entry_hash = document
92 .set_bytes(self.default_author().await, file_key, data)
93 .await
94 .map_err(|e| {
95 error!("{}", e);
96 OkuFsError::CannotCreateOrModifyFile
97 })?;
98
99 Ok(entry_hash)
100 }
101
102 pub async fn delete_file(
114 &self,
115 namespace_id: &NamespaceId,
116 path: &PathBuf,
117 ) -> miette::Result<usize> {
118 let file_key = path_to_entry_key(path);
119 let docs_client = &self.docs.client();
120 let document = docs_client
121 .open(*namespace_id)
122 .await
123 .map_err(|e| {
124 error!("{}", e);
125 OkuFsError::CannotOpenReplica
126 })?
127 .ok_or(OkuFsError::FsEntryNotFound)?;
128 let query = iroh_docs::store::Query::single_latest_per_key()
129 .key_exact(file_key)
130 .build();
131 let entry = document
132 .get_one(query)
133 .await
134 .map_err(|e| {
135 error!("{}", e);
136 OkuFsError::CannotReadFile
137 })?
138 .ok_or(OkuFsError::FsEntryNotFound)?;
139 let entries_deleted = document
140 .del(entry.author(), entry.key().to_vec())
141 .await
142 .map_err(|e| {
143 error!("{}", e);
144 OkuFsError::CannotDeleteFile
145 })?;
146 Ok(entries_deleted)
147 }
148
149 pub async fn get_entry(
161 &self,
162 namespace_id: &NamespaceId,
163 path: &PathBuf,
164 ) -> miette::Result<Entry> {
165 let file_key = path_to_entry_key(path);
166 let docs_client = &self.docs.client();
167 let document = docs_client
168 .open(*namespace_id)
169 .await
170 .map_err(|e| {
171 error!("{}", e);
172 OkuFsError::CannotOpenReplica
173 })?
174 .ok_or(OkuFsError::FsEntryNotFound)?;
175 let query = iroh_docs::store::Query::single_latest_per_key()
176 .key_exact(file_key)
177 .build();
178 let entry = document
179 .get_one(query)
180 .await
181 .map_err(|e| {
182 error!("{}", e);
183 OkuFsError::CannotReadFile
184 })?
185 .ok_or(OkuFsError::FsEntryNotFound)?;
186 Ok(entry)
187 }
188
189 pub async fn get_oldest_entry_timestamp(
201 &self,
202 namespace_id: &NamespaceId,
203 path: &PathBuf,
204 ) -> miette::Result<u64> {
205 let file_key = path_to_entry_key(path);
206 let docs_client = &self.docs.client();
207 let document = docs_client
208 .open(*namespace_id)
209 .await
210 .map_err(|e| {
211 error!("{}", e);
212 OkuFsError::CannotOpenReplica
213 })?
214 .ok_or(OkuFsError::FsEntryNotFound)?;
215 let query = iroh_docs::store::Query::all().key_exact(file_key).build();
216 let entries = document.get_many(query).await.map_err(|e| {
217 error!("{}", e);
218 OkuFsError::CannotListFiles
219 })?;
220 pin_mut!(entries);
221 let timestamps: Vec<u64> = entries
222 .map(|entry| entry.unwrap().timestamp())
223 .collect()
224 .await;
225 Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
226 }
227
228 pub async fn read_file(
240 &self,
241 namespace_id: &NamespaceId,
242 path: &PathBuf,
243 ) -> miette::Result<Bytes> {
244 let entry = self.get_entry(namespace_id, path).await?;
245 Ok(self.content_bytes(&entry).await.map_err(|e| {
246 error!("{}", e);
247 OkuFsError::CannotReadFile
248 })?)
249 }
250
251 pub async fn read_file_from_replica_handle(
263 &self,
264 document: &Doc,
265 path: &PathBuf,
266 ) -> miette::Result<Bytes> {
267 let file_key = path_to_entry_key(path);
268 let query = iroh_docs::store::Query::single_latest_per_key()
269 .key_exact(file_key)
270 .build();
271 let entry = document
272 .get_one(query)
273 .await
274 .map_err(|e| miette::miette!("{}", e))?
275 .ok_or(OkuFsError::FsEntryNotFound)?;
276 self.content_bytes(&entry)
277 .await
278 .map_err(|e| miette::miette!("{}", e))
279 }
280
281 pub async fn move_file(
297 &self,
298 from_namespace_id: &NamespaceId,
299 from_path: &PathBuf,
300 to_namespace_id: &NamespaceId,
301 to_path: &PathBuf,
302 ) -> miette::Result<(Hash, usize)> {
303 let data = self.read_file(from_namespace_id, from_path).await?;
304 let hash = self
305 .create_or_modify_file(to_namespace_id, to_path, data)
306 .await?;
307 let entries_deleted = self.delete_file(from_namespace_id, from_path).await?;
308 Ok((hash, entries_deleted))
309 }
310
311 pub async fn fetch_file(
323 &self,
324 namespace_id: &NamespaceId,
325 path: &PathBuf,
326 filters: &Option<Vec<FilterKind>>,
327 ) -> anyhow::Result<Bytes> {
328 match self.resolve_namespace_id(namespace_id).await {
329 Ok(ticket) => match self.fetch_file_with_ticket(&ticket, path, filters).await {
330 Ok(bytes) => Ok(bytes),
331 Err(e) => {
332 error!("{}", e);
333 Ok(self
334 .read_file(namespace_id, path)
335 .await
336 .map_err(|e| anyhow!("{}", e))?)
337 }
338 },
339 Err(e) => {
340 error!("{}", e);
341 Ok(self
342 .read_file(namespace_id, path)
343 .await
344 .map_err(|e| anyhow!("{}", e))?)
345 }
346 }
347 }
348
349 pub async fn fetch_file_with_ticket(
361 &self,
362 ticket: &DocTicket,
363 path: &PathBuf,
364 filters: &Option<Vec<FilterKind>>,
365 ) -> anyhow::Result<Bytes> {
366 let docs_client = &self.docs.client();
367 let replica = docs_client
368 .import_namespace(ticket.capability.clone())
369 .await?;
370 let filters = filters
371 .clone()
372 .unwrap_or(vec![FilterKind::Exact(path_to_entry_key(path))]);
373 replica
374 .set_download_policy(iroh_docs::store::DownloadPolicy::NothingExcept(filters))
375 .await?;
376 replica.start_sync(ticket.nodes.clone()).await?;
377 let namespace_id = ticket.capability.id();
378 let mut events = replica.subscribe().await?;
379 let sync_start = std::time::Instant::now();
380 while let Some(event) = events.next().await {
381 if matches!(event?, LiveEvent::SyncFinished(_)) {
382 let elapsed = sync_start.elapsed();
383 info!(
384 "Synchronisation took {elapsed:?} for {} … ",
385 crate::fs::util::fmt(namespace_id),
386 );
387 break;
388 }
389 }
390 self.read_file(&namespace_id, path)
391 .await
392 .map_err(|e| anyhow!("{}", e))
393 }
394}