Adds GET /api/v1/admin/system returning disk (scoped to storage_dir via statvfs), memory, CPU, and a server-side alerts array that fires at >90% disk or memory. Disk uses nix::sys::statvfs directly rather than sysinfo's Disks API to avoid mountpoint-matching gymnastics for the storage_dir. A new `Storage::local_root() -> Option<&Path>` trait method exposes the root; the default returns None so a future S3Storage gets `disk: null` in the response instead of fabricated numbers. CPU is sampled inline (refresh → 250ms sleep → refresh → read) so the endpoint adds 250ms of latency per call. No background-cache yet — admin traffic is low-volume and the moving parts aren't worth it until polling shows up. Alerts are evaluated server-side so the frontend can render them without re-implementing the thresholds.
169 lines
5.8 KiB
Rust
169 lines
5.8 KiB
Rust
use std::path::{Path, PathBuf};
|
|
|
|
use async_trait::async_trait;
|
|
use tokio::fs;
|
|
use tokio_util::io::ReaderStream;
|
|
|
|
use super::{Storage, StorageError, StreamingFile};
|
|
|
|
pub struct LocalStorage {
|
|
root: PathBuf,
|
|
}
|
|
|
|
impl LocalStorage {
|
|
pub fn new(root: impl Into<PathBuf>) -> Self {
|
|
Self { root: root.into() }
|
|
}
|
|
|
|
fn resolve(&self, key: &str) -> Result<PathBuf, StorageError> {
|
|
// NUL bytes are rejected by the Linux syscall layer, but the
|
|
// error surfaces as an opaque IO failure rather than the
|
|
// explicit `BadKey` the rest of the contract uses. Catch it
|
|
// here so the error path is consistent.
|
|
if key.contains('\0') {
|
|
return Err(StorageError::BadKey);
|
|
}
|
|
let key = key.trim_start_matches('/');
|
|
if key.is_empty() {
|
|
return Err(StorageError::BadKey);
|
|
}
|
|
if key.split('/').any(|seg| seg.is_empty() || seg == "." || seg == "..") {
|
|
return Err(StorageError::BadKey);
|
|
}
|
|
Ok(self.root.join(key))
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Storage for LocalStorage {
|
|
async fn put(&self, key: &str, bytes: &[u8]) -> Result<(), StorageError> {
|
|
let path = self.resolve(key)?;
|
|
if let Some(parent) = path.parent() {
|
|
fs::create_dir_all(parent).await?;
|
|
}
|
|
fs::write(path, bytes).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> {
|
|
let path = self.resolve(key)?;
|
|
match fs::read(&path).await {
|
|
Ok(b) => Ok(b),
|
|
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Err(StorageError::NotFound),
|
|
Err(e) => Err(e.into()),
|
|
}
|
|
}
|
|
|
|
async fn get_stream(&self, key: &str) -> Result<StreamingFile, StorageError> {
|
|
let path = self.resolve(key)?;
|
|
let file = match fs::File::open(&path).await {
|
|
Ok(f) => f,
|
|
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
|
return Err(StorageError::NotFound)
|
|
}
|
|
Err(e) => return Err(e.into()),
|
|
};
|
|
let size_bytes = file.metadata().await?.len();
|
|
// 64 KiB chunks: small enough that a few-MB page emits many frames
|
|
// (so streaming is observable), large enough to keep syscalls cheap.
|
|
let stream = ReaderStream::with_capacity(file, 64 * 1024);
|
|
Ok(StreamingFile {
|
|
stream: Box::pin(stream),
|
|
size_bytes,
|
|
})
|
|
}
|
|
|
|
async fn delete(&self, key: &str) -> Result<(), StorageError> {
|
|
let path = self.resolve(key)?;
|
|
match fs::remove_file(&path).await {
|
|
Ok(()) => Ok(()),
|
|
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Err(StorageError::NotFound),
|
|
Err(e) => Err(e.into()),
|
|
}
|
|
}
|
|
|
|
async fn exists(&self, key: &str) -> Result<bool, StorageError> {
|
|
let path: &Path = &self.resolve(key)?;
|
|
Ok(fs::try_exists(path).await?)
|
|
}
|
|
|
|
fn local_root(&self) -> Option<&Path> {
|
|
Some(&self.root)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use tempfile::tempdir;
|
|
|
|
#[tokio::test]
|
|
async fn put_get_delete_roundtrip() {
|
|
let dir = tempdir().unwrap();
|
|
let s = LocalStorage::new(dir.path());
|
|
|
|
s.put("mangas/abc/cover.jpg", b"hello").await.unwrap();
|
|
assert!(s.exists("mangas/abc/cover.jpg").await.unwrap());
|
|
assert_eq!(s.get("mangas/abc/cover.jpg").await.unwrap(), b"hello");
|
|
s.delete("mangas/abc/cover.jpg").await.unwrap();
|
|
assert!(!s.exists("mangas/abc/cover.jpg").await.unwrap());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rejects_path_traversal() {
|
|
let dir = tempdir().unwrap();
|
|
let s = LocalStorage::new(dir.path());
|
|
// Parent-dir reference at the start.
|
|
assert!(matches!(s.put("../escape", b"x").await, Err(StorageError::BadKey)));
|
|
// Parent-dir reference mid-path.
|
|
assert!(matches!(s.get("a/../../b").await, Err(StorageError::BadKey)));
|
|
// Empty key.
|
|
assert!(matches!(s.exists("").await, Err(StorageError::BadKey)));
|
|
// Current-dir reference (the implementation rejects `.` segments
|
|
// alongside `..`; this exercises that arm).
|
|
assert!(matches!(s.get("a/./b").await, Err(StorageError::BadKey)));
|
|
assert!(matches!(s.get(".").await, Err(StorageError::BadKey)));
|
|
// Empty segment via doubled slash.
|
|
assert!(matches!(s.get("a//b").await, Err(StorageError::BadKey)));
|
|
// NUL byte (rejected explicitly so callers see BadKey rather
|
|
// than an opaque IO error from the kernel).
|
|
assert!(matches!(s.put("a\0b", b"x").await, Err(StorageError::BadKey)));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn missing_key_is_not_found() {
|
|
let dir = tempdir().unwrap();
|
|
let s = LocalStorage::new(dir.path());
|
|
assert!(matches!(s.get("nope").await, Err(StorageError::NotFound)));
|
|
assert!(matches!(s.delete("nope").await, Err(StorageError::NotFound)));
|
|
assert!(matches!(
|
|
s.get_stream("nope").await.err(),
|
|
Some(StorageError::NotFound)
|
|
));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn get_stream_emits_multiple_chunks_for_large_files() {
|
|
use futures_util::StreamExt as _;
|
|
|
|
let dir = tempdir().unwrap();
|
|
let s = LocalStorage::new(dir.path());
|
|
// 256 KiB blob → at 64 KiB chunks should emit ~4 chunks.
|
|
let big = vec![7u8; 256 * 1024];
|
|
s.put("big.bin", &big).await.unwrap();
|
|
|
|
let StreamingFile { mut stream, size_bytes } = s.get_stream("big.bin").await.unwrap();
|
|
assert_eq!(size_bytes, big.len() as u64);
|
|
|
|
let mut chunks = 0usize;
|
|
let mut total = 0usize;
|
|
while let Some(frame) = stream.next().await {
|
|
let bytes = frame.unwrap();
|
|
chunks += 1;
|
|
total += bytes.len();
|
|
}
|
|
assert_eq!(total, big.len());
|
|
assert!(chunks > 1, "expected >1 chunk, got {chunks}");
|
|
}
|
|
}
|