Class MergeDedupStoragePlugin
java.lang.Object
org.epics.archiverappliance.common.mergededup.MergeDedupStoragePlugin
- All Implemented Interfaces:
BiDirectionalIterable,ETLDest,ETLSource,StorageMetrics,Reader,StoragePlugin,Writer
public class MergeDedupStoragePlugin
extends Object
implements StoragePlugin, ETLSource, ETLDest, StorageMetrics, BiDirectionalIterable
The MergeDedupStoragePlugin is primarily meant for achieving a small amount of failover in the archiving of a PV.
The scheme is to have another appliance also archive the same PV and then merge the data from that appliance into this one during ETL.
This appliance is the dest appliance; this is where the data is merged into.
The other appliance is an independent EPICS archiver appliance archiving the same PV; that is, it is not part of this cluster of appliances.
There are no special requirements for the other appliance other than that it should archive the same PV (and of course, a reasonably similar version to this one).
No calls are made to the other appliance to cleanup any data after consolidation; so, for convenience, the other appliance can be configured with a BlackholeStoragePlugin to automatically delete data after a certain time.
The MergeDedupStoragePlugin has two StoragePlugins parameters in its configuration.
- The
destparameter configures the data store in this appliance. - The
otherparameter points to the backup appliance using thedata_retrieval_urlof the other appliance.
mergeInData BPL call; the PV needs to be paused for this purpose.
As an example, assume that you wish to merge the data when you move data from the MTS to LTS; then you define your MTS in this appliance as a MergeDedupStoragePlugin.
If, for example.
- you use
pb://localhost?name=MTS&rootFolder=${ARCHAPPL_MEDIUM_TERM_FOLDER}&partitionGranularity=PARTITION_DAY&hold=2&gather=1as your regular MTS. - the
data_retrieval_urlfor the other appliance ishttp://localhost:17669/retrieval
merge://localhost?name=MTS
&dest=pb%3A%2F%2Flocalhost%3Fname%3DMTS%26rootFolder%3D%24%7BARCHAPPL_MEDIUM_TERM_FOLDER%7D%26partitionGranularity%3DPARTITION_DAY%26hold%3D2%26gather%3D1
&other=pbraw%3A%2F%2Flocalhost%3Fname%3DMTS%26rawURL%3Dhttp%253A%252F%252Flocalhost%253A17669%252Fretrieval%252Fdata%252FgetData.raw
where both the dest and other parameters are URL encoded.
- dest is URL encoded version of
pb://localhost?name=MTS&rootFolder=${ARCHAPPL_MEDIUM_TERM_FOLDER}&partitionGranularity=PARTITION_DAY&hold=2&gather=1 - other is URL encoded version of
pbraw://localhost?name=MTS&rawURL=http%3A%2F%2Flocalhost%3A17669%2Fretrieval%2Fdata%2FgetData.raw- where the rawURL is the URL encoded version of
http://localhost:17669/retrieval/data/getData.raw
- where the rawURL is the URL encoded version of
From an implementation perspective, this can be understood as a plugin that delegates almost all calls to the
dest plugin.
Calls that fetch data out of this plugin are merged/deduped with the other plugin.- Author:
- mshankar
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.epics.archiverappliance.common.BiDirectionalIterable
BiDirectionalIterable.IterationDirection -
Field Summary
Fields -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionintappendData(BasicContext context, String pvName, EventStream stream) booleanappendToETLAppendData(String pvName, EventStream stream, ETLContext context) This appends an EventStream to the ETL append data for a PV.booleancommitETLAppendData(String pvName, ETLContext context) This concatenates the ETL append data for a PV with the PV's destination data.booleanShould ETL move data from this source to the destination on shutdown.voidconvert(BasicContext context, String pvName, ConversionFunction conversionFuntion) Sometimes, PVs change types, EGUs etc.getAllStreams(String pv, ETLContext context) Given a pv and a time, this method returns all the streams.getDataForPV(BasicContext context, String pvName, Instant startTime, Instant endTime, PostProcessor postProcessor) Get a string description of this plugin; one that can potentially be used in log messages and provide context.getETLStreams(String pv, Instant currentTime, ETLContext context) Given a pv and a time, this method returns all the streams that are ready for ETL.getFirstKnownEvent(BasicContext context, String pvName) Get the first event for this PV.getLastKnownEvent(BasicContext context, String pvName) Gets the last known event in this destination.getName()Multiple PVs will probably use the same storage area and we identify the area using the name.longgetTotalSpace(StorageMetricsContext storageMetricsContext) Gets the total space left on this device.longgetUsableSpace(StorageMetricsContext storageMetricsContext) Gets the space available to this VM on this devicevoidinitialize(String configURL, ConfigService configService) Each storage plugin is registered to a URI scheme; for example, the PlainStoragePBPlugin uses pb:// as the scheme.voiditerate(BasicContext context, String pvName, Instant startAtTime, Predicate<Event> thePredicate, BiDirectionalIterable.IterationDirection direction, Period searchPeriod) Generic method to iterate over the data for specified PV.voidmarkForDeletion(ETLInfo info, ETLContext context) Delete the ETLStream identifier by info when you can as it has already been consumed by the ETL destination.Provide the prefix for storage plugin urls.voidrenamePV(BasicContext context, String oldName, String newName) Change the name of a PV.booleanrunPostProcessors(String pvName, ArchDBRTypes dbrtype, ETLContext context) Run the post processors associated with this plugin if any for this pv.longspaceConsumedByPV(String pvName) Gets an estimate of the space consumed by this PV on this device.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.epics.archiverappliance.etl.ETLDest
etlInfoListProcessor
-
Field Details
-
MERGE_PLUGIN_IDENTIFIER
- See Also:
-
-
Constructor Details
-
MergeDedupStoragePlugin
public MergeDedupStoragePlugin()
-
-
Method Details
-
getDataForPV
public List<Callable<EventStream>> getDataForPV(BasicContext context, String pvName, Instant startTime, Instant endTime, PostProcessor postProcessor) throws IOException - Specified by:
getDataForPVin interfaceReader- Throws:
IOException
-
getETLStreams
public List<ETLInfo> getETLStreams(String pv, Instant currentTime, ETLContext context) throws IOException Description copied from interface:ETLSourceGiven a pv and a time, this method returns all the streams that are ready for ETL. For example, if the partition granularity of a source is an hour, then this method returns all the streams that are in this source for the previous hours. Ideally, these streams must be closed for writing and should not change. The ETL process will consolidates these streams into the ETL destination, which is expected to be at a longer time granularity.- Specified by:
getETLStreamsin interfaceETLSource- Parameters:
pv- The name of PV.currentTime- The time that is being used as the cutoff. If we pass in a timestamp way out into the future, we should return all the streams available.context- ETLContext- Returns:
- List ETLinfo
- Throws:
IOException-
-
getAllStreams
Description copied from interface:ETLSourceGiven a pv and a time, this method returns all the streams.- Specified by:
getAllStreamsin interfaceETLSource- Parameters:
pv- The name of PV.context- ETLContext- Returns:
- List ETLinfo
- Throws:
IOException-
-
getFirstKnownEvent
Description copied from interface:ReaderGet the first event for this PV. This call is used to optimize away calls to other readers that have older data.- Specified by:
getFirstKnownEventin interfaceReader- Parameters:
context-pvName- The PV name- Returns:
- Event The first event of pvName
- Throws:
IOException-
-
getLastKnownEvent
Description copied from interface:WriterGets the last known event in this destination. Future events will be appended to this destination only if their timestamp is more recent than the timestamp of this event. If there is no last known event, then a null is returned.- Specified by:
getLastKnownEventin interfaceWriter- Parameters:
context-pvName- The PV name- Returns:
- Event The last known event of pvName
- Throws:
IOException-
-
appendData
- Specified by:
appendDatain interfaceWriter- Throws:
IOException
-
getTotalSpace
Description copied from interface:StorageMetricsGets the total space left on this device.- Specified by:
getTotalSpacein interfaceStorageMetrics- Parameters:
storageMetricsContext- StorageMetricsContext- Returns:
- getTotalSpac
- Throws:
IOException-
-
getUsableSpace
Description copied from interface:StorageMetricsGets the space available to this VM on this device- Specified by:
getUsableSpacein interfaceStorageMetrics- Parameters:
storageMetricsContext- StorageMetricsContext- Returns:
- getUsableSpace
- Throws:
IOException-
-
spaceConsumedByPV
Description copied from interface:StorageMetricsGets an estimate of the space consumed by this PV on this device.- Specified by:
spaceConsumedByPVin interfaceStorageMetrics- Parameters:
pvName- The name of PV.- Returns:
- spaceConsumedByPV
- Throws:
IOException-
-
appendToETLAppendData
public boolean appendToETLAppendData(String pvName, EventStream stream, ETLContext context) throws IOException Description copied from interface:ETLDestThis appends an EventStream to the ETL append data for a PV.- Specified by:
appendToETLAppendDatain interfaceETLDest- Parameters:
pvName- The name of PV.stream- The EventStream to append to the append data for a PV.context- ETLContext- Returns:
- boolean True or False
- Throws:
IOException-
-
commitETLAppendData
Description copied from interface:ETLDestThis concatenates the ETL append data for a PV with the PV's destination data.- Specified by:
commitETLAppendDatain interfaceETLDest- Parameters:
pvName- The name of PV.context- ETLContext- Returns:
- boolean True or False
- Throws:
IOException-
-
runPostProcessors
public boolean runPostProcessors(String pvName, ArchDBRTypes dbrtype, ETLContext context) throws IOException Description copied from interface:ETLDestRun the post processors associated with this plugin if any for this pv. The post processing is done after the commit and outside of the ETL transaction. This process is expected to catch up on previously missed/incomplete computation of cached post processing files. I can think of at least two usecases for this - one where we decide to go back and add a post processor for a pv and one where we change the algorithm for the post processor and want to recompute all the cached files again.- Specified by:
runPostProcessorsin interfaceETLDest- Parameters:
pvName- The name of PV.dbrtype- ArchDBRTypescontext- ETLContext- Returns:
- boolean True or False
- Throws:
IOException-
-
markForDeletion
Description copied from interface:ETLSourceDelete the ETLStream identifier by info when you can as it has already been consumed by the ETL destination. You can delete it later or immediately.- Specified by:
markForDeletionin interfaceETLSource- Parameters:
info- ETLInfocontext- ETLContext
-
getPartitionGranularity
- Specified by:
getPartitionGranularityin interfaceETLDest- Specified by:
getPartitionGranularityin interfaceETLSource
-
consolidateOnShutdown
public boolean consolidateOnShutdown()Description copied from interface:ETLSourceShould ETL move data from this source to the destination on shutdown. For example, if you are using a ramdisk for the STS and you have a UPS, you can minimize any data loss but turning this bit on for data stores that are on the ramdisk. On shutdown, ETL will try to move the data out of this store into the next lifetime store.- Specified by:
consolidateOnShutdownin interfaceETLSource- Returns:
- boolean True or False
-
pluginIdentifier
Description copied from interface:StoragePluginProvide the prefix for storage plugin urls.- Specified by:
pluginIdentifierin interfaceStoragePlugin- Returns:
- String of the Storage Plugin Identifier
-
getName
Description copied from interface:StoragePluginMultiple PVs will probably use the same storage area and we identify the area using the name. This is principally used in capacity planning/load balancing to identify the storage area for the PV. We should make sure that storage's with similar lifetimes have the same name in all the appliances. The name is also used to identify the storage in the storage report. For example, the PlainStoragePlugin takes a name parameter and we should use something like STS as the identity for the short term store in all the appliances.- Specified by:
getNamein interfaceETLDest- Specified by:
getNamein interfaceETLSource- Specified by:
getNamein interfaceStorageMetrics- Specified by:
getNamein interfaceStoragePlugin- Returns:
- name
-
getDescription
Description copied from interface:StoragePluginGet a string description of this plugin; one that can potentially be used in log messages and provide context.- Specified by:
getDescriptionin interfaceETLDest- Specified by:
getDescriptionin interfaceETLSource- Specified by:
getDescriptionin interfaceStoragePlugin- Returns:
- description
-
initialize
Description copied from interface:StoragePluginEach storage plugin is registered to a URI scheme; for example, the PlainStoragePBPlugin uses pb:// as the scheme. Configuration for a storage plugin typically comes in as a URL like URI.- The config service identifies the storage plugin using the scheme ("pb" maps to PlainStoragePBPlugin)
- Creates an instance using the default constructor.
- Calls initialize with the complete URL.
- Specified by:
initializein interfaceStoragePlugin- Parameters:
configURL- The complete URLconfigService-- Throws:
IOException-- See Also:
-
renamePV
Description copied from interface:StoragePluginChange the name of a PV. This happens occasionally in the EPICS world when people change the names of PVs but want to retain the data. This method is used to change the name of the PV in any of the datasets for PVoldName. For example, in PB files, the name of the PV is encoded in the file names and is also stored in the header. In this case, we expect the plugin to move the data to new files names and change the PV name in the file header. To avoid getting into issues about data changing when renaming files, the PV can be assumed to be in a paused state.- Specified by:
renamePVin interfaceStoragePlugin- Parameters:
context-oldName- The old PV namenewName- The new PV name- Throws:
IOException-
-
convert
public void convert(BasicContext context, String pvName, ConversionFunction conversionFuntion) throws IOException Description copied from interface:StoragePluginSometimes, PVs change types, EGUs etc. In these cases, we are left with the problem of what to do with the already archived data. We can rename the PV to a new but related name - this keeps the existing data as is. Or, we can attempt to convert to the new type, EGU etc. This method can be used to convert the existing data using the supplied conversion function. Conversions should be all or nothing; that is, first convert all the streams into temporary chunks and then do a bulk rename once all the conversions have succeeded. Note that we'll also be using the same conversion mechanism for imports and other functions that change data. So, when/if implementing the conversion function, make sure we respect the typical expectations within the archiver - monotonically increasing timestamps and so on. To avoid getting into issues about data changing when converting, the PV can be assumed to be in a paused state.- Specified by:
convertin interfaceStoragePlugin- Parameters:
context-pvName- The PV nameconversionFuntion-- Throws:
IOException-
-
iterate
public void iterate(BasicContext context, String pvName, Instant startAtTime, Predicate<Event> thePredicate, BiDirectionalIterable.IterationDirection direction, Period searchPeriod) throws IOException Description copied from interface:BiDirectionalIterableGeneric method to iterate over the data for specified PV. We provide a time to start the iteration at ( inclusive ) and a direction and a Predicate. The plugin then iterates ( forwards or backwards ) and calls the Predicate for each sample. The iteration stops when the Predicate returns false or when we run out of samples. Iteration also stops when we get an exception. So this mode of traversal is vulnerable to corruption in the data. We may relax this constraint later by providing a optional bool to ignore exceptions.- Specified by:
iteratein interfaceBiDirectionalIterable- Parameters:
context-pvName- The PV namestartAtTime- Start the iteration at this time. If going forwards, this is the first sample on or before the specified time. If going backwards, this is the first sample on or after the specified time.thePredicate- - the Predicate to applydirection- - Forwards or backwardssearchPeriod- - An estimate on the amount of time we want to iterate. This is used to determine the appropriate chunks containing data at a very high level and thus to limit the search. The predicate itself is the one that controls when the iteration terminates. So, for example, you could specify a searchPeriod of 1 year but stop the iteration after 1 minute- Throws:
IOException-
-