Class AppendDataStateData
java.lang.Object
edu.stanford.slac.archiverappliance.plain.AppendDataStateData
- Direct Known Subclasses:
ParquetAppendDataStateData,PBAppendDataStateData
Companion class to
PlainStoragePlugin that handles the appending of event streams in a partition-aware fashion.
This is used both by the engine and by ETL.- Author:
- mshankar
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected shortprotected final Stringprotected Instantprotected final PartitionGranularityprotected Pathprotected shortprotected final PVNameToKeyMappingprotected final Stringprotected EventFileWriter -
Constructor Summary
ConstructorsConstructorDescriptionAppendDataStateData(PartitionGranularity partitionGranularity, String rootFolder, String desc, Instant lastKnownTimestamp, PVNameToKeyMapping pv2key, PathResolver pathResolver) -
Method Summary
Modifier and TypeMethodDescriptionabstract booleanbulkAppend(String pvName, ETLContext context, ETLBulkStream bulkStream, String extension, String extensionToCopyFrom) Append data in bulk skipping some of the per event checks.protected EventcheckStream(String pvName, ETLContext context, ETLBulkStream bulkStream, Class<? extends ETLBulkStream> streamType) voidprotected abstract EventFileWritercreateNewWriter(String pvName, Path pvPath, EventStream stream) protected PathResolverintpartitionBoundaryAwareAppendData(BasicContext context, String pvName, EventStream stream, String extension, String extensionToCopyFrom) Append data into PB files honoring partition boundaries switching into new partitions as we cross the boundary.protected PathpreparePartition(String pvName, EventStream stream, BasicContext context, String extension, String extensionToCopyFrom, Instant ts, Path pvPath, PathResolver pathResolver) Prepare a new partition.protected booleanTell appendData if we should skip this event based on the last known event, current year of the destination file etc...protected voidshouldISwitchPartitions(BasicContext context, String pvName, String extension, Instant ts) Should we switch to a new partition? If so, return the new partition, else return the current partition.protected abstract voidupdateStateBasedOnExistingFile(String pvName, Path pvPath)
-
Field Details
-
writer
-
rootFolder
-
desc
-
partitionGranularity
-
pv2key
-
previousFilePath
-
currentEventsYear
protected short currentEventsYear -
previousYear
protected short previousYear -
lastKnownTimeStamp
-
-
Constructor Details
-
AppendDataStateData
public AppendDataStateData(PartitionGranularity partitionGranularity, String rootFolder, String desc, Instant lastKnownTimestamp, PVNameToKeyMapping pv2key, PathResolver pathResolver) - Parameters:
partitionGranularity- partitionGranularity of the PB plugin.rootFolder- RootFolder of the PB plugindesc- Desc for logging purposeslastKnownTimestamp- This is probably the most important argument here. This is the last known timestamp in this storage. If null, we assume time(0) for the last known timestamp.pv2key- PVNameToKeyMapping
-
-
Method Details
-
getPathResolver
-
partitionBoundaryAwareAppendData
public int partitionBoundaryAwareAppendData(BasicContext context, String pvName, EventStream stream, String extension, String extensionToCopyFrom) throws IOException Append data into PB files honoring partition boundaries switching into new partitions as we cross the boundary.- We make sure timestamp monotonicity is maintained.
- We generate clean partitions.
- Parameters:
context-pvName- The PV namestream-extension-extensionToCopyFrom-- Returns:
- eventsAppended
- Throws:
IOException-
-
checkStream
protected Event checkStream(String pvName, ETLContext context, ETLBulkStream bulkStream, Class<? extends ETLBulkStream> streamType) throws IOException - Throws:
IOException
-
bulkAppend
public abstract boolean bulkAppend(String pvName, ETLContext context, ETLBulkStream bulkStream, String extension, String extensionToCopyFrom) throws IOException Append data in bulk skipping some of the per event checks.- Parameters:
pvName- The PV namecontext- The ETL contextbulkStream- The ETL bulk streamextension-extensionToCopyFrom-- Returns:
- boolean
- Throws:
IOException-
-
shouldISwitchPartitions
protected void shouldISwitchPartitions(BasicContext context, String pvName, String extension, Instant ts) throws IOException Should we switch to a new partition? If so, return the new partition, else return the current partition.- Parameters:
context-pvName- The PV nameextension-ts- The epoch seconds- Throws:
IOException-
-
shouldISkipEventBasedOnTimeStamps
Tell appendData if we should skip this event based on the last known event, current year of the destination file etc...- Parameters:
event-- Returns:
- Boolean
-
preparePartition
protected Path preparePartition(String pvName, EventStream stream, BasicContext context, String extension, String extensionToCopyFrom, Instant ts, Path pvPath, PathResolver pathResolver) throws IOException Prepare a new partition.- Parameters:
pvName- The PV namestream-context-extension-extensionToCopyFrom-ts- The epoch secondspvPath-- Returns:
- pvPath
- Throws:
IOException-
-
createNewWriter
protected abstract EventFileWriter createNewWriter(String pvName, Path pvPath, EventStream stream) throws IOException - Throws:
IOException
-
updateStateBasedOnExistingFile
protected abstract void updateStateBasedOnExistingFile(String pvName, Path pvPath) throws IOException - Throws:
IOException
-
closeStreams
- Throws:
IOException
-