oku_fs/fs/
core.rs

1use super::*;
2use crate::discovery::{INITIAL_PUBLISH_DELAY, REPUBLISH_DELAY};
3use bytes::Bytes;
4#[cfg(feature = "fuse")]
5use fuse_mt::spawn_mount;
6use iroh::protocol::ProtocolHandler;
7use iroh_blobs::store::Store;
8use iroh_docs::Author;
9use log::{error, info};
10#[cfg(feature = "fuse")]
11use miette::IntoDiagnostic;
12use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
13#[cfg(feature = "fuse")]
14use std::collections::HashMap;
15use std::path::PathBuf;
16#[cfg(feature = "fuse")]
17use std::sync::Arc;
18#[cfg(feature = "fuse")]
19use std::sync::RwLock;
20#[cfg(feature = "fuse")]
21use tokio::runtime::Handle;
22use tokio::sync::watch::{self};
23
24impl OkuFs {
25    /// Obtain the private key of the node's authorship credentials.
26    ///
27    /// # Return
28    ///
29    /// The private key of the node's authorship credentials.
30    pub async fn get_author(&self) -> anyhow::Result<Author> {
31        let default_author_id = self.default_author().await;
32
33        self.docs
34            .client()
35            .authors()
36            .export(default_author_id)
37            .await
38            .ok()
39            .flatten()
40            .ok_or(anyhow::anyhow!(
41                "Missing private key for default author ({}).",
42                crate::fs::util::fmt_short(default_author_id)
43            ))
44    }
45
46    /// Starts an instance of an Oku file system.
47    /// In the background, an Iroh node is started if none is running, or is connected to if one is already running.
48    ///
49    /// # Arguments
50    ///
51    /// * `handle` - If compiling with the `fuse` feature, a Tokio runtime handle is required.
52    ///
53    /// # Returns
54    ///
55    /// A running instance of an Oku file system.
56    pub async fn start(#[cfg(feature = "fuse")] handle: &Handle) -> anyhow::Result<Self> {
57        let endpoint = iroh::Endpoint::builder()
58            .discovery_n0()
59            .discovery_dht()
60            .discovery_local_network()
61            .bind()
62            .await?;
63        let blobs = iroh_blobs::net_protocol::Blobs::persistent(NODE_PATH.clone())
64            .await?
65            .build(&endpoint);
66        let gossip = iroh_gossip::net::Gossip::builder()
67            .spawn(endpoint.clone())
68            .await?;
69        let docs = iroh_docs::protocol::Docs::persistent(NODE_PATH.clone())
70            .spawn(&blobs, &gossip)
71            .await?;
72        let blobs_clone = blobs.clone();
73        let willow = iroh_willow::Engine::spawn(
74            endpoint.clone(),
75            move || {
76                iroh_willow::store::persistent::Store::new(
77                    NODE_PATH.clone(),
78                    blobs_clone.store().clone(),
79                )
80                .unwrap()
81            },
82            Default::default(),
83        );
84
85        let router = iroh::protocol::Router::builder(endpoint.clone())
86            .accept(iroh_blobs::ALPN, blobs.clone())
87            .accept(iroh_gossip::ALPN, gossip.clone())
88            .accept(iroh_docs::ALPN, docs.clone())
89            .spawn()
90            .await?;
91        info!(
92            "Default author ID is {} … ",
93            crate::fs::util::fmt_short(docs.client().authors().default().await.unwrap_or_default())
94        );
95
96        let (replica_sender, _replica_receiver) = watch::channel(());
97        let (okunet_fetch_sender, _okunet_fetch_receiver) = watch::channel(false);
98
99        let oku_fs = Self {
100            endpoint,
101            blobs,
102            docs,
103            willow,
104            router,
105            replica_sender,
106            okunet_fetch_sender,
107            #[cfg(feature = "fuse")]
108            fs_handles: Arc::new(RwLock::new(HashMap::new())),
109            #[cfg(feature = "fuse")]
110            newest_handle: Arc::new(RwLock::new(0)),
111            #[cfg(feature = "fuse")]
112            handle: handle.clone(),
113            dht: mainline::Dht::server()?.as_async(),
114        };
115        let oku_fs_clone = oku_fs.clone();
116        tokio::spawn(async move {
117            loop {
118                tokio::time::sleep(INITIAL_PUBLISH_DELAY).await;
119                match oku_fs_clone.announce_replicas().await {
120                    Ok(_) => info!("Announced all replicas … "),
121                    Err(e) => error!("{}", e),
122                }
123                match oku_fs_clone.refresh_users().await {
124                    Ok(_) => info!("Refreshed OkuNet database … "),
125                    Err(e) => error!("{}", e),
126                }
127                tokio::time::sleep(REPUBLISH_DELAY - INITIAL_PUBLISH_DELAY).await;
128            }
129        });
130        Ok(oku_fs.clone())
131    }
132
133    /// Shuts down the Oku file system.
134    pub async fn shutdown(self) {
135        let _ = self.endpoint.close().await;
136        let _ = self.router.shutdown().await;
137        self.docs.shutdown().await;
138        self.blobs.shutdown().await;
139        self.blobs.store().shutdown().await;
140    }
141
142    /// Retrieve the content of a document entry.
143    ///
144    /// # Arguments
145    ///
146    /// * `entry` - An entry in an Iroh document.
147    ///
148    /// # Returns
149    ///
150    /// The content of the entry, as raw bytes.
151    pub async fn content_bytes(&self, entry: &iroh_docs::Entry) -> anyhow::Result<Bytes> {
152        self.content_bytes_by_hash(&entry.content_hash()).await
153    }
154
155    /// Retrieve the content of a document entry by its hash.
156    ///
157    /// # Arguments
158    ///
159    /// * `hash` - The content hash of an Iroh document.
160    ///
161    /// # Returns
162    ///
163    /// The content of the entry, as raw bytes.
164    pub async fn content_bytes_by_hash(&self, hash: &iroh_blobs::Hash) -> anyhow::Result<Bytes> {
165        self.blobs.client().read_to_bytes(*hash).await
166    }
167
168    /// Determines the oldest timestamp of a file entry in any replica stored locally.
169    ///
170    /// # Returns
171    ///
172    /// The oldest timestamp in any local replica, in microseconds from the Unix epoch.
173    pub async fn get_oldest_timestamp(&self) -> miette::Result<u64> {
174        let replicas = self.list_replicas().await?;
175        let mut timestamps: Vec<u64> = Vec::new();
176        for (replica, _capability_kind) in replicas {
177            timestamps.push(
178                self.get_oldest_timestamp_in_folder(&replica, &PathBuf::from("/"))
179                    .await?,
180            );
181        }
182        Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
183    }
184
185    /// Determines the latest timestamp of a file entry in any replica stored locally.
186    ///
187    /// # Returns
188    ///
189    /// The latest timestamp in any local replica, in microseconds from the Unix epoch.
190    pub async fn get_newest_timestamp(&self) -> miette::Result<u64> {
191        let replicas = self.list_replicas().await?;
192        let mut timestamps: Vec<u64> = Vec::new();
193        for (replica, _capability_kind) in replicas {
194            timestamps.push(
195                self.get_newest_timestamp_in_folder(&replica, &PathBuf::from("/"))
196                    .await?,
197            );
198        }
199        Ok(*timestamps.par_iter().max().unwrap_or(&u64::MIN))
200    }
201
202    /// Determines the size of the file system.
203    ///
204    /// # Returns
205    ///
206    /// The total size, in bytes, of the files in every replica stored locally.
207    pub async fn get_size(&self) -> miette::Result<u64> {
208        let replicas = self.list_replicas().await?;
209        let mut size = 0;
210        for (replica, _capability_kind) in replicas {
211            size += self.get_folder_size(&replica, &PathBuf::from("/")).await?;
212        }
213        Ok(size)
214    }
215
216    #[cfg(feature = "fuse")]
217    /// Mount the file system.
218    ///
219    /// # Arguments
220    ///
221    /// * `path` - The path to the file system mount point.
222    ///
223    /// # Returns
224    ///
225    /// A handle referencing the mounted file system; joining or dropping the handle will unmount the file system and shutdown the node.
226    pub fn mount(&self, path: PathBuf) -> miette::Result<fuser::BackgroundSession> {
227        spawn_mount(fuse_mt::FuseMT::new(self.clone(), 1), path, &[]).into_diagnostic()
228    }
229}