Skip to content

Commit 024767d

Browse files
committed
Fixing streaming of files and generalizing for other storage providers.
1 parent 97a2ab6 commit 024767d

5 files changed

Lines changed: 191 additions & 128 deletions

File tree

roda-core/roda-core/src/main/java/org/roda/core/storage/RangeConsumesOutputStream.java

Lines changed: 31 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,88 +7,78 @@
77
*/
88
package org.roda.core.storage;
99

10-
import java.io.File;
1110
import java.io.IOException;
12-
import java.io.InputStream;
1311
import java.io.OutputStream;
14-
import java.io.RandomAccessFile;
15-
import java.nio.file.Files;
16-
import java.nio.file.Path;
1712
import java.util.Date;
1813

19-
import org.apache.commons.io.IOUtils;
2014
import org.roda.core.data.v2.ConsumesSkipableOutputStream;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
2117

2218
public class RangeConsumesOutputStream implements ConsumesSkipableOutputStream {
2319

20+
private static final Logger LOGGER = LoggerFactory.getLogger(RangeConsumesOutputStream.class);
21+
2422
private static final String DEFAULT_MIME_TYPE = "application/octet-stream";
25-
private final Path directAccessPath;
23+
private final SeekableContentPayload payload;
24+
private final String filename;
2625
private final String mediaType;
26+
private final Date lastModified;
27+
private final long size;
2728

28-
public RangeConsumesOutputStream(Path directAccessPath, String mediaType) {
29-
this.directAccessPath = directAccessPath;
29+
public RangeConsumesOutputStream(SeekableContentPayload payload, String filename, Date lastModified, long size,
30+
String mediaType) {
31+
this.payload = payload;
32+
this.filename = filename;
33+
this.lastModified = lastModified;
34+
this.size = size;
3035
this.mediaType = mediaType;
3136
}
3237

33-
public RangeConsumesOutputStream(Path directAccessPath) {
34-
this(directAccessPath, DEFAULT_MIME_TYPE);
38+
public RangeConsumesOutputStream(SeekableContentPayload payload, Binary binary) {
39+
this(payload, binary, DEFAULT_MIME_TYPE);
40+
}
41+
42+
public RangeConsumesOutputStream(SeekableContentPayload payload, Binary binary, String mediaType) {
43+
this.payload = payload;
44+
this.filename = binary.getStoragePath().getName();
45+
this.lastModified = new Date(); // TODO missing information about binary last modified date
46+
this.size = binary.getSizeInBytes();
47+
this.mediaType = mediaType;
3548
}
3649

3750
@Override
3851
public void consumeOutputStream(OutputStream out) throws IOException {
39-
try (InputStream in = Files.newInputStream(directAccessPath)) {
40-
IOUtils.copyLarge(in, out);
41-
}
52+
payload.writeTo(out, 0, getSize());
4253
}
4354

4455
@Override
4556
public void consumeOutputStream(OutputStream out, int from, int len) throws IOException {
46-
try (InputStream in = Files.newInputStream(directAccessPath)) {
47-
IOUtils.copyLarge(in, out, from, len);
48-
}
57+
payload.writeTo(out, from, len);
4958
}
5059

5160
@Override
5261
public void consumeOutputStream(OutputStream out, long from, long end) {
5362
try {
54-
File file = directAccessPath.toFile();
55-
byte[] buffer = new byte[1024];
56-
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
57-
long pos = from;
58-
randomAccessFile.seek(pos);
59-
while (pos < end) {
60-
randomAccessFile.read(buffer);
61-
out.write(buffer);
62-
pos += buffer.length;
63-
}
64-
out.flush();
65-
}
63+
payload.writeTo(out, from, end - from);
6664
} catch (IOException e) {
67-
// ignore
65+
LOGGER.warn("Error writing to output stream", e);
6866
}
6967
}
7068

7169
@Override
7270
public Date getLastModified() {
73-
try {
74-
return new Date(Files.getLastModifiedTime(directAccessPath).toMillis());
75-
} catch (IOException e) {
76-
return null;
77-
}
71+
return lastModified;
7872
}
7973

8074
@Override
8175
public long getSize() {
82-
try {
83-
return Files.size(directAccessPath);
84-
} catch (IOException e) {
85-
return -1;
86-
}
76+
return size;
8777
}
8878

8979
@Override
9080
public String getFileName() {
91-
return directAccessPath.getFileName().toString();
81+
return filename;
9282
}
9383

9484
@Override
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.roda.core.storage;
2+
3+
import java.io.IOException;
4+
import java.io.OutputStream;
5+
6+
public interface SeekableContentPayload extends ContentPayload {
7+
/**
8+
* Writes a specific range of the content to the output stream.
9+
* * @param out The output stream to write to
10+
* @param offset The start byte position (inclusive)
11+
* @param length The number of bytes to write
12+
*/
13+
void writeTo(OutputStream out, long offset, long length) throws IOException;
14+
}

roda-core/roda-core/src/main/java/org/roda/core/storage/fs/FSPathContentPayload.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,22 @@
99

1010
import java.io.IOException;
1111
import java.io.InputStream;
12+
import java.io.OutputStream;
13+
import java.io.RandomAccessFile;
1214
import java.net.URI;
1315
import java.nio.file.Files;
1416
import java.nio.file.Path;
1517
import java.nio.file.StandardCopyOption;
1618

17-
import org.roda.core.storage.ContentPayload;
19+
import org.apache.commons.io.IOUtils;
20+
import org.roda.core.storage.SeekableContentPayload;
1821

1922
/**
2023
* Class that implements {@code ContentPayload} for File System
2124
*
2225
* @author Luis Faria <lfaria@keep.pt>
2326
*/
24-
public class FSPathContentPayload implements ContentPayload {
27+
public class FSPathContentPayload implements SeekableContentPayload {
2528

2629
private final Path path;
2730

@@ -44,4 +47,41 @@ public URI getURI() throws IOException, UnsupportedOperationException {
4447
return path.toUri();
4548
}
4649

50+
@Override
51+
public void writeTo(OutputStream out, long offset, long length) throws IOException {
52+
// 1. Use NIO InputStream (Efficient: uses native pread/lseek)
53+
try (InputStream is = Files.newInputStream(path)) {
54+
55+
// 2. Seek to the start position (Instant operation on files)
56+
long skipped = is.skip(offset);
57+
if (skipped < offset) {
58+
// File is smaller than the offset requested
59+
return;
60+
}
61+
62+
// 3. Transfer only the requested amount
63+
byte[] buffer = new byte[8192]; // Standard 8KB buffer
64+
long remaining = length;
65+
int bytesRead;
66+
67+
// Loop while we still need data AND we haven't hit EOF
68+
while (remaining > 0) {
69+
// Determine how much to read: either the full buffer or the remaining bytes
70+
int bytesToRead = (int) Math.min(buffer.length, remaining);
71+
72+
bytesRead = is.read(buffer, 0, bytesToRead);
73+
74+
if (bytesRead == -1) {
75+
break; // End of file reached prematurely
76+
}
77+
78+
// Critical: Only write the bytes we actually read
79+
out.write(buffer, 0, bytesRead);
80+
81+
remaining -= bytesRead;
82+
}
83+
84+
}
85+
}
86+
4787
}

roda-ui/roda-wui/src/main/java/org/roda/wui/api/v2/services/DIPFileService.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
*/
88
package org.roda.wui.api.v2.services;
99

10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import java.util.Optional;
13+
1014
import org.roda.core.data.exceptions.AuthorizationDeniedException;
1115
import org.roda.core.data.exceptions.GenericException;
1216
import org.roda.core.data.exceptions.NotFoundException;
@@ -17,29 +21,36 @@
1721
import org.roda.core.data.v2.ip.DIPFile;
1822
import org.roda.core.model.LiteRODAObjectFactory;
1923
import org.roda.core.model.ModelService;
24+
import org.roda.core.storage.Binary;
25+
import org.roda.core.storage.ContentPayload;
2026
import org.roda.core.storage.DirectResourceAccess;
2127
import org.roda.core.storage.RangeConsumesOutputStream;
28+
import org.roda.core.storage.SeekableContentPayload;
2229
import org.roda.wui.common.model.RequestContext;
2330
import org.springframework.stereotype.Service;
2431

25-
import java.util.ArrayList;
26-
import java.util.List;
27-
import java.util.Optional;
28-
2932
/**
3033
*
3134
* @author Eduardo Teixeira <eteixeira@keep.pt>
3235
*/
3336
@Service
3437
public class DIPFileService {
3538
public RangeConsumesOutputStream retrieveDIPFileRangeStream(RequestContext requestContext, DIPFile dipfile)
36-
throws RequestNotValidException {
39+
throws RequestNotValidException {
3740
ModelService model = requestContext.getModelService();
3841
if (!dipfile.isDirectory()) {
3942
final RangeConsumesOutputStream stream;
4043
try {
41-
DirectResourceAccess directDIPFileAccess = model.getDirectAccess(dipfile);
42-
stream = new RangeConsumesOutputStream(directDIPFileAccess.getPath());
44+
Binary binary = model.getBinary(dipfile);
45+
ContentPayload payload = binary.getContent();
46+
47+
if (payload instanceof SeekableContentPayload) {
48+
SeekableContentPayload seekable = (SeekableContentPayload) payload;
49+
50+
stream = new RangeConsumesOutputStream(seekable, binary);
51+
} else {
52+
throw new RequestNotValidException("Range stream for file unsupported");
53+
}
4354
return stream;
4455
} catch (RequestNotValidException | GenericException | AuthorizationDeniedException | NotFoundException e) {
4556
throw new RuntimeException(e);
@@ -50,7 +61,7 @@ public RangeConsumesOutputStream retrieveDIPFileRangeStream(RequestContext reque
5061
}
5162

5263
public StreamResponse retrieveDIPFileStreamResponse(RequestContext requestContext, DIPFile dipFile)
53-
throws GenericException, RequestNotValidException, NotFoundException, AuthorizationDeniedException {
64+
throws GenericException, RequestNotValidException, NotFoundException, AuthorizationDeniedException {
5465
ModelService model = requestContext.getModelService();
5566

5667
final ConsumesOutputStream stream;

0 commit comments

Comments
 (0)