Example of using BSPump with CSV files
In the previous article, I talked about the stream-analyzer library called BSPump that originated during one of our workshops. Its usage varies from simple data processors to real-time data analyzers, metric computation, and anomaly detection. However, it all may sound abstract to you unless you discover the basic rules and elegance of its implementation. So this is exactly what I am going to show you in the following article.
Let us say we have a CSV file, no, wait, a lot of CSV files that are coming to our storage directory with flashing speed. We even do not have the slightest notion what names of the files are, the only thing we know is that we need to process their data and deliver them to our database as quickly as possible. After a minute (and it is a lot of time for us software engineers) of thinking we come up with a solution: We need to write a simple program whose instances run on many computers, processors or processor cores at once, so they are together able to quickly process the CSV files in our storage directory (or any other media or endpoint to be general). We need to make sure that each stored file is loaded only by one of these instances, that the data are processed and sent to the database asynchronously line by line and that every instance of the program can handle lots of files until there are none. It seems like tough work, doesn’t it? Not with BSPump.
Let's do it
First, we need to take a look at the BSPump GitHub repository, which you can find here: https://github.com/LibertyAces/BitSwanPump There is an example file called bspump-csv.py
that contains the basic work we need to develop our solution. You may notice that there are just a few lines of code in the example – that is the core of magic of BSPump!
The example first creates an application object and stores it in a local variable:
app = bspump.BSPumpApplication()
Since we need to obtain, process and send data, we also need to create the so-called pipeline that itself holds ordered objects of processors that get, modify and pass data to one another in a specified order. You can see the code of the sample pipeline in the top part of the example. The pipeline is represented by the class named SamplePipeline
and the core of its function dwells in the self.build
method.
Let us take a closer look:
self.Sink = bspump.file.FileCSVSink(app, self, config={'path': 'out.csv'})
self.build(
bspump.file.FileCSVSource(app, self, config={
'path': 'sample.csv',
'delimiter': ';'
}).on(bspump.trigger.RunOnceTrigger(app)),
bspump.common.PPrintProcessor(app, self),
self.Sink
)
The first processor that we are going to pass to the pipeline is FileCSVSource
that is able to read a CSV file or files as Python dictionaries. In the example there is a specified file named sample.csv
, however we can also specify a storage directory with more files. The simplest way how to do it is to modify the value of the path
attribute from sample.csv
to, let us say, ./data/*
. This little change indicates that we are going to read all files from the data
directory in our repository. Now you may ask: If there are more instances of the program running, will they all read all files present in the directory? The answer may surprise you, because the implicit behavior of the BSPump makes sure that every file is processed only once regardless of how many parallel instances are actually running. Each file is locked when one of the instances is reading it and when the file is processed, the appropriate state is indicated by the “-processed” suffix that is added to the filename, so no other instance will try to read it again. The BSPump will simply take care of it. The only thing we need to do is to remove the RunOnceTrigger
that calls the pipeline exactly once, and use, for example, the OpportunisticTrigger
, which keeps restarting the pipeline until there are no files available.
The sample pipeline then contains a processor that does nothing but prints the output data to the screen, and a CSV sink that saves the data to a specified output CSV file. We can of course use our own processors and sinks, for example the pre-prepared ElasticSearch sink:
bspump.elasticsearch.ElasticSearchSink(app, self, es_connection, config={
"index_prefix": "YOUR_INDEX_PREFIX",
"doctype": "YOUR_DOCTYPE"
})
The ElasicSearch sink receives a connection (as a parameter) that communicates between our program and the ElasticSearch that runs within a specified endpoint (URL). If you are more interested in this feature and its configuration, please see the ElasticSearch example file called bspump-es.py
in the BSPump repository. If we combine both examples together, we nearly obtain our desired solution.
Build your own processor
Now we can use our own processor instead of the previously mentioned PPrintProcessor
.
Its code is quite simple:
class PrintProcessor(Processor):
def process(self, context, event):
print(event)
return event
You can see that what it really does is that it only prints the input data that are represented by the event
attribute. Creating your own processor is straightforward. In our example, the event
attribute contains one line of the processed CSV file (whose name can be obtained through the context
variable). This line’s data are implicitly represented by an ordered Python dictionary. We can now create our own class and include it in our sample pipeline. And that’s it. There is nothing more you have to do to create a functioning program that asynchronously reads CSV files, performs your specified transformations and delivers the data to a single database or other type of storage. You can of course move each class to a separate file, create Python modules, comment methods and make the code clean and crystal clear to other developers in your company.
Image: Visualization of processed sample data loaded from CSV files with BSPump.
Conclusion
I hope this simple example illustrates how easy and simple it is to create seemingly complex application with BSPump, pipelines and asynchronous approach. If you have any ideas on how we can improve the concept or extend capabilities of the BSPump or the ASAB server the BSPump is based upon, please let us know or contribute to the code at GitHub: https://github.com/LibertyAces/BitSwanPump
Continue to next article
Most Recent Articles
- A beginner-friendly intro to the Correlator for effective cybersecurity detection
- Inotify in ASAB Library
- From State Machine to Stateless Microservice
- Entangled ways of product development in the area of cybersecurity #3 - LogMan.io
- Entangled ways of product development in the area of cybersecurity #2 - BitSwan
You Might Be Interested in Reading These Articles
SeaCat tutorial - Chapter 2: Simple Post (iOS)
The goal of this article is to create a simple iOS client which generates a simple POST Request which will be read in host written in Node.js and the output generated in the console. The whole comunication will be handled by SeaCat which help us to establish fast and secure connection among our key components.
Published on September 09, 2014
5 Things to Look for in an Enterprise Mobile Development Platform Solution
Today many enteprises are looking to have their own mobile applications. With the right solution, you can build a mobile app that will fit your organization’s needs like a glove and be in the driver’s seat of the development.
Published on September 01, 2015
Situations Where Mobile App Security Best Practices is Necessary
The use of mobile app security best practices has become a necessity as app development and mobile usage continue to grow. These practices are needed to improve consumer protection, trust, and regulatory compliance.
Published on March 24, 2015