Storage Protocols
SyncEngine is designed to be storage-agnostic through a protocol-based architecture. This document explains how to implement custom storage backends.
Protocol Overview
SyncEngine uses Python’s typing.Protocol for structural subtyping. This means any class that implements the required methods will work with SyncEngine without needing to inherit from a base class.
Core protocols:
StorageClientProtocol: Interface for storage operations (upload, download, delete)FileEntryProtocol: Interface for file/folder metadataFileEntriesManagerProtocol: Interface for managing file collections
StorageClientProtocol
The main interface for storage backends.
Required Methods
upload_file()
Upload a file to storage:
def upload_file(
self,
file_path: Path,
relative_path: str,
storage_id: int = 0,
chunk_size: int = 5242880,
use_multipart_threshold: int = 52428800,
progress_callback: Optional[Callable[[int, int], None]] = None
) -> Any:
"""Upload a file to storage.
Args:
file_path: Local path to file to upload
relative_path: Path in storage (preserves directory structure)
storage_id: Storage/workspace identifier (0 for default)
chunk_size: Size of upload chunks in bytes
use_multipart_threshold: File size to trigger multipart upload
progress_callback: Called with (bytes_uploaded, total_bytes)
Returns:
Upload result (implementation-specific)
"""
pass
download_file()
Download a file from storage:
def download_file(
self,
file_id: str,
output_path: Path,
progress_callback: Optional[Callable[[int, int], None]] = None
) -> Path:
"""Download a file from storage.
Args:
file_id: File identifier for download operations. This should be the
content-based identifier (typically from FileEntry.hash), which is
used by the storage API's download endpoint. This is the identifier
returned by DestinationFile.get_download_identifier(), which defaults
to the hash field. This is NOT the integer database ID from FileEntry.id.
Examples:
- Drime Cloud: Base64-encoded download identifier (e.g., "MTEyNDUwfHBhZA")
- Google Drive: Opaque file ID string (e.g., "1A2B3C4D5E6F7G8H")
- S3/generic: Content hash or object key
output_path: Local path where file should be saved
progress_callback: Called with (bytes_downloaded, total_bytes)
Returns:
Path where file was saved
"""
pass
delete_file_entries()
Delete files from storage:
def delete_file_entries(
self,
entry_ids: list[int],
delete_forever: bool = False
) -> Any:
"""Delete file entries from storage.
Args:
entry_ids: List of entry IDs to delete
delete_forever: If True, permanently delete; if False, move to trash
Returns:
Delete result (implementation-specific)
"""
pass
create_folder()
Create a folder in storage:
def create_folder(
self,
name: str,
parent_id: Optional[int] = None,
storage_id: int = 0
) -> dict[str, Any]:
"""Create a folder in storage.
Args:
name: Folder name (can include path separators for nested folders)
parent_id: Parent folder ID (None for root)
storage_id: Storage/workspace identifier
Returns:
Dictionary with 'status' and 'id' keys
"""
pass
resolve_path_to_id()
Resolve a path to an entry ID:
def resolve_path_to_id(
self,
path: str,
storage_id: int = 0
) -> Optional[int]:
"""Resolve a path to an entry ID.
Args:
path: Path to resolve
storage_id: Storage/workspace identifier
Returns:
Entry ID if found, None otherwise
"""
pass
rename_entry()
Rename/move an entry:
def rename_entry(
self,
entry_id: int,
new_name: str,
new_parent_id: Optional[int] = None
) -> Any:
"""Rename or move an entry.
Args:
entry_id: Entry to rename/move
new_name: New name
new_parent_id: New parent folder ID (for moves)
Returns:
Rename result (implementation-specific)
"""
pass
FileEntryProtocol
Interface for file/folder metadata.
Required Properties
from typing import Protocol, Optional
class FileEntryProtocol(Protocol):
@property
def id(self) -> int:
"""Unique identifier (persists across renames)."""
...
@property
def type(self) -> str:
"""Entry type: 'file' or 'folder'."""
...
@property
def file_size(self) -> int:
"""File size in bytes (0 for folders)."""
...
@property
def hash(self) -> str:
"""Content hash or download identifier.
This field should contain the identifier used for download operations.
Common formats include MD5/SHA hashes, opaque service IDs, or download tokens.
This value is used by DestinationFile.get_download_identifier() and passed
to StorageClientProtocol.download_file() as the file_id parameter.
"""
...
@property
def name(self) -> str:
"""File or folder name."""
...
@property
def updated_at(self) -> Optional[str]:
"""ISO timestamp of last modification."""
...
Example Implementation
class MyFileEntry:
def __init__(self, data: dict):
self._data = data
@property
def id(self) -> int:
return self._data['id']
@property
def type(self) -> str:
return self._data['type']
@property
def file_size(self) -> int:
return self._data.get('size', 0)
@property
def hash(self) -> str:
"""Content hash or download identifier.
This field should contain the identifier used by your storage
service's download API. Common formats:
- MD5/SHA hash of file content
- Opaque file ID from the storage service
- Base64-encoded download token
- Object key or path identifier
This value is used by DestinationFile.get_download_identifier()
to determine what identifier to pass to download_file().
"""
return self._data.get('hash', '')
@property
def name(self) -> str:
return self._data['name']
@property
def updated_at(self) -> Optional[str]:
return self._data.get('updated_at')
FileEntriesManagerProtocol
Interface for managing collections of file entries.
Required Methods
get_all_entries()
Get all entries in storage:
def get_all_entries(self) -> Iterator[FileEntryProtocol]:
"""Get all file entries.
Yields:
File entries one at a time
"""
pass
refresh()
Refresh the entry cache:
def refresh(self) -> None:
"""Refresh the internal cache of entries."""
pass
get_entry_by_id()
Get an entry by ID:
def get_entry_by_id(self, entry_id: int) -> Optional[FileEntryProtocol]:
"""Get an entry by its ID.
Args:
entry_id: Entry ID to look up
Returns:
File entry if found, None otherwise
"""
pass
Implementing a Storage Backend
Here’s a complete example of implementing a custom storage backend:
Example: S3 Storage Backend
import boto3
from pathlib import Path
from typing import Any, Callable, Iterator, Optional
from syncengine.protocols import (
StorageClientProtocol,
FileEntryProtocol,
FileEntriesManagerProtocol
)
class S3FileEntry:
"""File entry for S3 objects."""
def __init__(self, obj: dict, bucket: str):
self._obj = obj
self._bucket = bucket
@property
def id(self) -> int:
# Use hash of key as ID
return hash(self._obj['Key'])
@property
def type(self) -> str:
return 'folder' if self._obj['Key'].endswith('/') else 'file'
@property
def file_size(self) -> int:
return self._obj.get('Size', 0)
@property
def hash(self) -> str:
# S3 provides ETag which is often the MD5 hash
return self._obj.get('ETag', '').strip('"')
@property
def name(self) -> str:
return self._obj['Key'].split('/')[-1]
@property
def updated_at(self) -> Optional[str]:
dt = self._obj.get('LastModified')
return dt.isoformat() if dt else None
class S3EntriesManager:
"""Manages S3 file entries."""
def __init__(self, client: 'S3StorageClient', bucket: str, prefix: str = ''):
self.client = client
self.bucket = bucket
self.prefix = prefix
self._entries: list[S3FileEntry] = []
self.refresh()
def refresh(self) -> None:
"""Refresh entries from S3."""
self._entries = []
paginator = self.client.s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=self.bucket, Prefix=self.prefix):
for obj in page.get('Contents', []):
self._entries.append(S3FileEntry(obj, self.bucket))
def get_all_entries(self) -> Iterator[FileEntryProtocol]:
"""Get all entries."""
yield from self._entries
def get_entry_by_id(self, entry_id: int) -> Optional[FileEntryProtocol]:
"""Get entry by ID."""
for entry in self._entries:
if entry.id == entry_id:
return entry
return None
class S3StorageClient:
"""S3 storage backend for SyncEngine."""
def __init__(self, bucket: str, prefix: str = '', **kwargs):
self.bucket = bucket
self.prefix = prefix
self.s3_client = boto3.client('s3', **kwargs)
def upload_file(
self,
file_path: Path,
relative_path: str,
storage_id: int = 0,
chunk_size: int = 5242880,
use_multipart_threshold: int = 52428800,
progress_callback: Optional[Callable[[int, int], None]] = None
) -> Any:
"""Upload file to S3."""
key = f"{self.prefix}/{relative_path}".lstrip('/')
file_size = file_path.stat().st_size
# Progress callback wrapper
def s3_progress(bytes_transferred):
if progress_callback:
progress_callback(bytes_transferred, file_size)
# Upload file
self.s3_client.upload_file(
str(file_path),
self.bucket,
key,
Callback=s3_progress
)
return {'key': key, 'bucket': self.bucket}
def download_file(
self,
file_id: str,
output_path: Path,
progress_callback: Optional[Callable[[int, int], None]] = None
) -> Path:
"""Download file from S3."""
# file_id is the hash from FileEntry.hash
# In this implementation, we use ETag as the hash
# You need to map hash to S3 key for your use case
key = self._hash_to_key(file_id)
# Get file size
response = self.s3_client.head_object(Bucket=self.bucket, Key=key)
file_size = response['ContentLength']
# Progress callback wrapper
def s3_progress(bytes_transferred):
if progress_callback:
progress_callback(bytes_transferred, file_size)
# Download file
self.s3_client.download_file(
self.bucket,
key,
str(output_path),
Callback=s3_progress
)
return output_path
def delete_file_entries(
self,
entry_ids: list[int],
delete_forever: bool = False
) -> Any:
"""Delete files from S3."""
# Map IDs to keys
keys = [self._id_to_key(entry_id) for entry_id in entry_ids]
# Delete objects
self.s3_client.delete_objects(
Bucket=self.bucket,
Delete={
'Objects': [{'Key': key} for key in keys]
}
)
return {'deleted': len(keys)}
def create_folder(
self,
name: str,
parent_id: Optional[int] = None,
storage_id: int = 0
) -> dict[str, Any]:
"""Create folder in S3 (zero-byte object with trailing slash)."""
key = f"{self.prefix}/{name}/".lstrip('/')
self.s3_client.put_object(
Bucket=self.bucket,
Key=key,
Body=b''
)
return {'status': 'created', 'id': hash(key)}
def resolve_path_to_id(
self,
path: str,
storage_id: int = 0
) -> Optional[int]:
"""Resolve path to ID."""
key = f"{self.prefix}/{path}".lstrip('/')
try:
self.s3_client.head_object(Bucket=self.bucket, Key=key)
return hash(key)
except self.s3_client.exceptions.NoSuchKey:
return None
def rename_entry(
self,
entry_id: int,
new_name: str,
new_parent_id: Optional[int] = None
) -> Any:
"""Rename/move entry in S3."""
old_key = self._id_to_key(entry_id)
new_key = f"{self.prefix}/{new_name}".lstrip('/')
# Copy to new location
self.s3_client.copy_object(
Bucket=self.bucket,
CopySource={'Bucket': self.bucket, 'Key': old_key},
Key=new_key
)
# Delete old location
self.s3_client.delete_object(Bucket=self.bucket, Key=old_key)
return {'old_key': old_key, 'new_key': new_key}
def _hash_to_key(self, hash_value: str) -> str:
"""Map hash to S3 key (implementation-specific)."""
# In real implementation, maintain a hash->key mapping
raise NotImplementedError()
def _id_to_key(self, entry_id: int) -> str:
"""Map ID to S3 key (implementation-specific)."""
# In real implementation, maintain an ID->key mapping
raise NotImplementedError()
# Usage
from syncengine import SyncEngine, SyncPair, SyncMode
# Create S3 client
s3_client = S3StorageClient(
bucket='my-bucket',
prefix='sync-folder',
region_name='us-west-2'
)
# Create entries manager factory
def create_entries_manager(client, storage_id):
return S3EntriesManager(client, 'my-bucket', 'sync-folder')
# Create sync engine
engine = SyncEngine(
client=s3_client,
entries_manager_factory=create_entries_manager
)
# Create sync pair
pair = SyncPair(
source_root="/home/user/docs",
destination_root="",
source_client=local_client,
destination_client=s3_client,
mode=SyncMode.SOURCE_TO_DESTINATION
)
# Sync
stats = engine.sync_pair(pair)
Testing Your Implementation
SyncEngine includes test utilities to verify your storage backend:
from syncengine.testing import StorageClientTestSuite
class TestS3Client(StorageClientTestSuite):
def create_client(self):
"""Create a client instance for testing."""
return S3StorageClient(
bucket='test-bucket',
region_name='us-west-2'
)
def create_entries_manager(self, client, storage_id):
"""Create an entries manager for testing."""
return S3EntriesManager(client, 'test-bucket')
# Run tests
pytest.main([__file__])
Best Practices
Error Handling
Implement proper error handling and retries
Raise meaningful exceptions
Log errors for debugging
def upload_file(self, file_path: Path, relative_path: str, **kwargs):
try:
# Upload logic
pass
except ConnectionError as e:
logger.error(f"Connection error uploading {file_path}: {e}")
raise
except Exception as e:
logger.error(f"Error uploading {file_path}: {e}")
raise
Progress Callbacks
Always call progress callbacks if provided
Report accurate progress
Call with final values on completion
def upload_file(self, file_path: Path, relative_path: str,
progress_callback=None, **kwargs):
file_size = file_path.stat().st_size
bytes_uploaded = 0
# Upload in chunks
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
# Upload chunk
upload_chunk(chunk)
bytes_uploaded += len(chunk)
if progress_callback:
progress_callback(bytes_uploaded, file_size)
# Ensure final callback
if progress_callback:
progress_callback(file_size, file_size)
Content Hashing
Use consistent hashing algorithm (MD5, SHA256)
Compute hashes efficiently
Cache hashes when possible
import hashlib
def compute_hash(file_path: Path) -> str:
"""Compute MD5 hash of file."""
hasher = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
hasher.update(chunk)
return hasher.hexdigest()
Caching
Cache file listings when possible
Implement efficient refresh mechanisms
Invalidate cache when needed
class CachedEntriesManager:
def __init__(self, client):
self.client = client
self._cache = {}
self._cache_time = None
self._cache_ttl = 300 # 5 minutes
def get_all_entries(self):
now = time.time()
if (self._cache_time is None or
now - self._cache_time > self._cache_ttl):
self.refresh()
yield from self._cache.values()
def refresh(self):
self._cache = {}
# Fetch entries from storage
for entry in self.client.list_entries():
self._cache[entry.id] = entry
self._cache_time = time.time()
Next Steps
Examples - See complete examples
API Reference - Detailed API documentation
Review existing implementations (LocalStorageClient, etc.)