1use super::*;
2use crate::config::OkuFsConfig;
3use crate::database::core::DATABASE;
4use crate::database::dht::ReplicaAnnouncement;
5use crate::error::{OkuDiscoveryError, OkuFsError, OkuFuseError};
6use anyhow::anyhow;
7use futures::{pin_mut, StreamExt};
8use iroh_base::ticket::Ticket;
9use iroh_docs::engine::LiveEvent;
10use iroh_docs::rpc::client::docs::ShareMode;
11use iroh_docs::rpc::AddrInfoOptions;
12use iroh_docs::store::FilterKind;
13use iroh_docs::sync::CapabilityKind;
14use iroh_docs::DocTicket;
15use iroh_docs::NamespaceId;
16use log::{error, info};
17use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
18use std::path::PathBuf;
19use util::{merge_tickets, path_to_entry_prefix};
20
21impl OkuFs {
22 pub async fn create_replica(&self) -> miette::Result<NamespaceId> {
28 let docs_client = &self.docs.client();
29 let new_document = docs_client.create().await.map_err(|e| {
30 error!("{}", e);
31 OkuFsError::CannotCreateReplica
32 })?;
33 let document_id = new_document.id();
34 new_document.close().await.map_err(|e| {
35 error!("{}", e);
36 OkuFsError::CannotExitReplica
37 })?;
38 self.replica_sender.send_replace(());
39 Ok(document_id)
40 }
41
42 pub async fn delete_replica(&self, namespace_id: &NamespaceId) -> miette::Result<()> {
48 let docs_client = &self.docs.client();
49 self.replica_sender.send_replace(());
50 Ok(docs_client.drop_doc(*namespace_id).await.map_err(|e| {
51 error!("{}", e);
52 OkuFsError::CannotDeleteReplica
53 })?)
54 }
55
56 pub async fn list_replicas(&self) -> miette::Result<Vec<(NamespaceId, CapabilityKind)>> {
62 let docs_client = &self.docs.client();
63 let replicas = docs_client.list().await.map_err(|e| {
64 error!("{}", e);
65 OkuFsError::CannotListReplicas
66 })?;
67 pin_mut!(replicas);
68 let mut replica_ids: Vec<(NamespaceId, CapabilityKind)> = replicas
69 .filter_map(|replica| async move { replica.ok() })
70 .collect()
71 .await;
72
73 let config = OkuFsConfig::load_or_create_config()?;
74 if let Some(home_replica) = config.home_replica()? {
75 replica_ids.sort_unstable_by_key(|(namespace_id, capability_kind)| {
76 (
77 *namespace_id != home_replica,
78 !matches!(capability_kind, CapabilityKind::Write),
79 )
80 });
81 } else {
82 replica_ids.sort_unstable_by_key(|(_namespace_id, capability_kind)| {
83 !matches!(capability_kind, CapabilityKind::Write)
84 });
85 }
86 Ok(replica_ids)
87 }
88
89 pub async fn get_replica_capability(
99 &self,
100 namespace_id: &NamespaceId,
101 ) -> miette::Result<CapabilityKind> {
102 let replicas_vec = self.list_replicas().await?;
103 match replicas_vec
104 .par_iter()
105 .find_any(|replica| replica.0 == *namespace_id)
106 {
107 Some(replica) => Ok(replica.1),
108 None => Err(OkuFuseError::NoReplica(crate::fs::util::fmt(namespace_id)).into()),
109 }
110 }
111
112 pub async fn fetch_replica_by_id(
120 &self,
121 namespace_id: &NamespaceId,
122 path: &Option<PathBuf>,
123 ) -> anyhow::Result<()> {
124 let ticket = self.resolve_namespace_id(namespace_id).await?;
125 let docs_client = self.docs.client();
126 let replica_sender = self.replica_sender.clone();
127 match path.clone() {
128 Some(path) => {
129 let replica = docs_client.import_namespace(ticket.capability).await?;
130 let filter = FilterKind::Prefix(path_to_entry_prefix(&path));
131 replica
132 .set_download_policy(iroh_docs::store::DownloadPolicy::NothingExcept(vec![
133 filter,
134 ]))
135 .await?;
136 replica.start_sync(ticket.nodes).await?;
137 let mut events = replica.subscribe().await?;
138 let sync_start = std::time::Instant::now();
139 while let Some(event) = events.next().await {
140 if matches!(event?, LiveEvent::SyncFinished(_)) {
141 let elapsed = sync_start.elapsed();
142 info!(
143 "Synchronisation took {elapsed:?} for {} … ",
144 crate::fs::util::fmt(namespace_id),
145 );
146 break;
147 }
148 }
149 }
150 None => {
151 if let Some(replica) = docs_client.open(*namespace_id).await.unwrap_or(None) {
152 replica
153 .set_download_policy(iroh_docs::store::DownloadPolicy::default())
154 .await?;
155 replica.start_sync(ticket.nodes).await?;
156 let mut events = replica.subscribe().await?;
157 let sync_start = std::time::Instant::now();
158 while let Some(event) = events.next().await {
159 if matches!(event?, LiveEvent::SyncFinished(_)) {
160 let elapsed = sync_start.elapsed();
161 info!(
162 "Synchronisation took {elapsed:?} for {} … ",
163 crate::fs::util::fmt(namespace_id),
164 );
165 break;
166 }
167 }
168 } else {
169 let (_replica, mut events) = docs_client.import_and_subscribe(ticket).await?;
170 let sync_start = std::time::Instant::now();
171 while let Some(event) = events.next().await {
172 if matches!(event?, LiveEvent::SyncFinished(_)) {
173 let elapsed = sync_start.elapsed();
174 info!(
175 "Synchronisation took {elapsed:?} for {} … ",
176 crate::fs::util::fmt(namespace_id),
177 );
178 break;
179 }
180 }
181 }
182 }
183 }
184 replica_sender.send_replace(());
185 Ok(())
186 }
187
188 pub async fn fetch_replica_by_ticket(
200 &self,
201 ticket: &DocTicket,
202 path: &Option<PathBuf>,
203 filters: &Option<Vec<FilterKind>>,
204 ) -> anyhow::Result<()> {
205 let namespace_id = ticket.capability.id();
206 let docs_client = self.docs.client();
207 let replica_sender = self.replica_sender.clone();
208 match path.clone() {
209 Some(path) => {
210 let replica = docs_client
211 .import_namespace(ticket.capability.clone())
212 .await?;
213 let filters = filters
214 .clone()
215 .unwrap_or(vec![FilterKind::Prefix(path_to_entry_prefix(&path))]);
216 replica
217 .set_download_policy(iroh_docs::store::DownloadPolicy::NothingExcept(filters))
218 .await?;
219 replica.start_sync(ticket.nodes.clone()).await?;
220 let mut events = replica.subscribe().await?;
221 let sync_start = std::time::Instant::now();
222 while let Some(event) = events.next().await {
223 if matches!(event?, LiveEvent::SyncFinished(_)) {
224 let elapsed = sync_start.elapsed();
225 info!(
226 "Synchronisation took {elapsed:?} for {} … ",
227 crate::fs::util::fmt(namespace_id),
228 );
229 break;
230 }
231 }
232 }
233 None => {
234 if let Some(replica) = docs_client.open(namespace_id).await.unwrap_or(None) {
235 replica
236 .set_download_policy(iroh_docs::store::DownloadPolicy::default())
237 .await?;
238 replica.start_sync(ticket.nodes.clone()).await?;
239 let mut events = replica.subscribe().await?;
240 let sync_start = std::time::Instant::now();
241 while let Some(event) = events.next().await {
242 if matches!(event?, LiveEvent::SyncFinished(_)) {
243 let elapsed = sync_start.elapsed();
244 info!(
245 "Synchronisation took {elapsed:?} for {} … ",
246 crate::fs::util::fmt(namespace_id),
247 );
248 break;
249 }
250 }
251 } else {
252 let (_replica, mut events) =
253 docs_client.import_and_subscribe(ticket.clone()).await?;
254 let sync_start = std::time::Instant::now();
255 while let Some(event) = events.next().await {
256 if matches!(event?, LiveEvent::SyncFinished(_)) {
257 let elapsed = sync_start.elapsed();
258 info!(
259 "Synchronisation took {elapsed:?} for {} … ",
260 crate::fs::util::fmt(namespace_id),
261 );
262 break;
263 }
264 }
265 }
266 }
267 };
268 replica_sender.send_replace(());
269 Ok(())
270 }
271
272 pub async fn sync_replica(&self, namespace_id: &NamespaceId) -> anyhow::Result<()> {
280 let ticket = self.resolve_namespace_id(namespace_id).await?;
281 let docs_client = self.docs.client();
282 let replica_sender = self.replica_sender.clone();
283 let (_replica, mut events) = docs_client.import_and_subscribe(ticket).await?;
284 let sync_start = std::time::Instant::now();
285 while let Some(event) = events.next().await {
286 if matches!(event?, LiveEvent::SyncFinished(_)) {
287 let elapsed = sync_start.elapsed();
288 info!(
289 "Synchronisation took {elapsed:?} for {} … ",
290 crate::fs::util::fmt(namespace_id),
291 );
292 break;
293 }
294 }
295 replica_sender.send_replace(());
296 Ok(())
297 }
298
299 pub async fn resolve_namespace_id(
309 &self,
310 namespace_id: &NamespaceId,
311 ) -> anyhow::Result<DocTicket> {
312 let get_stream = self.dht.get_mutable(namespace_id.as_bytes(), None, None)?;
313 tokio::pin!(get_stream);
314 let mut tickets = Vec::new();
315 while let Some(mutable_item) = get_stream.next().await {
316 let _ = DATABASE.upsert_announcement(&ReplicaAnnouncement {
317 key: mutable_item.key().to_vec(),
318 signature: mutable_item.signature().to_vec(),
319 });
320 tickets.push(DocTicket::from_bytes(mutable_item.value())?)
321 }
322 merge_tickets(&tickets).ok_or(anyhow!(
323 "Could not find tickets for {} … ",
324 crate::fs::util::fmt(namespace_id)
325 ))
326 }
327
328 pub async fn create_document_ticket(
340 &self,
341 namespace_id: &NamespaceId,
342 share_mode: &ShareMode,
343 ) -> miette::Result<DocTicket> {
344 if matches!(share_mode, ShareMode::Write)
345 && matches!(
346 self.get_replica_capability(namespace_id).await?,
347 CapabilityKind::Read
348 )
349 {
350 Err(OkuFsError::CannotShareReplicaWritable(*namespace_id).into())
351 } else {
352 let docs_client = &self.docs.client();
353 let document = docs_client
354 .open(*namespace_id)
355 .await
356 .map_err(|e| {
357 error!("{}", e);
358 OkuFsError::CannotOpenReplica
359 })?
360 .ok_or(OkuFsError::FsEntryNotFound)?;
361 Ok(document
362 .share(share_mode.clone(), AddrInfoOptions::RelayAndAddresses)
363 .await
364 .map_err(|e| {
365 error!("{}", e);
366 OkuDiscoveryError::CannotGenerateSharingTicket
367 })?)
368 }
369 }
370}