Data streaming with Jini

Posted by {"name"=>"Palash Ray", "email"=>"paawak@gmail.com", "url"=>"https://www.linkedin.com/in/palash-ray/"} on January 03, 2016 · 5 mins read

Lets take a simple example of how we can stream data from a Jini Server to a Jini Client. Data Streaming means that we can send huge, unbounded data to a consumer. In this example, we will read from a DataStore and directly send it to the client.
We have the following simple api:

public interface BankDetailStreamingService extends Remote {
    void streamAllBankDetails(RemoteDataListener bankDetailRemoteListener) throws RemoteException;
}

The RemoteDataListener is a remote call-back where the data would be published from the Jini server as it becomes available.

public interface RemoteDataListener extends Remote {
    void newData(T data) throws RemoteException;
    void endOfData() throws RemoteException;
}

Note that this is a Remote interface as well.

Server Implementation

The server implementation is very simple: its a straight delegation to the Dao.

@Service
public class BankDetailStreamingServiceImpl implements BankDetailStreamingService {
    private final BankDetailStreamingDao bankDetailStreamingDao;
    @Autowired
    public BankDetailStreamingServiceImpl(BankDetailStreamingDao bankDetailStreamingDao) {
	this.bankDetailStreamingDao = bankDetailStreamingDao;
    }
    @Override
    public void streamAllBankDetails(RemoteDataListener bankDetailRemoteListener) {
	bankDetailStreamingDao.streamAllBankDetails(bankDetailRemoteListener);
    }
}

The Dao just pushes data to the RemoteDataListener as shown below:

@Repository
public class BankDetailStreamingDaoImpl implements BankDetailStreamingDao {
    private final JdbcTemplate jdbcTemplate;
    @Autowired
    public BankDetailStreamingDaoImpl(JdbcTemplate jdbcTemplate) {
	this.jdbcTemplate = jdbcTemplate;
    }
    @Override
    public void streamAllBankDetails(RemoteDataListener bankDetailRemoteListener) {
	RowMapper rowMapper = new BankDetailMapper();
	jdbcTemplate.query("select * from bank_details", new ResultSetExtractor() {
	    @Override
	    public Void extractData(ResultSet resultSet) throws SQLException, DataAccessException {
		while (resultSet.next()) {
		    try {
			bankDetailRemoteListener.newData(rowMapper.mapRow(resultSet, -1));
		    } catch (RemoteException e) {
			throw new RuntimeException(e);
		    }
		}
		try {
		    bankDetailRemoteListener.endOfData();
		} catch (RemoteException e) {
		    throw new RuntimeException(e);
		}
		return null;
	    }
	});
    }
}

Client Implementation

The trick really is in the client implementation. In this example, since the RemoteDataListener  is a Remote listener, it is an exported Jini service which lives on the client. This is an example where the Jini Client and Jini Server swaps roles and the client becomes the server. Just to illustrate our point, we have created a very simple client which just writes the data received on to the console.

public class RemoteDataListenerConsolePrinterImpl implements RemoteDataListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteDataListenerConsolePrinterImpl.class);
    @Override
    public void newData(T data) {
	LOGGER.info("New data recieved: {}", data);
    }
    @Override
    public void endOfData() {
	LOGGER.info("End of data, exiting...");
	System.exit(0);
    }
}

Then, we have a simple class with the main method to invoke the streaming service:

public class SimpleStreamingClient {
    public static void main(String[] args) throws RemoteException {
	SpringContextHelper springContextHelper = new SpringContextHelper();
	RemoteDataListener remoteDataListener = new RemoteDataListenerConsolePrinterImpl<>();
	Exporter exporter = new BasicJeriExporter(TcpServerEndpoint.getInstance(0), new BasicILFactory());
	@SuppressWarnings("unchecked")
	RemoteDataListener exportedRemoteDataListener = (RemoteDataListener) exporter.export(remoteDataListener);
	BankDetailStreamingService bankDetailStreamingService = springContextHelper.getBankDetailStreamingService();
	bankDetailStreamingService.streamAllBankDetails(exportedRemoteDataListener);
    }
}

Note that just before invoking the streaming service and passing the RemoteDataListener, it is being duly exported as a Jini service and the exported RemoteDataListener is then passed to the streaming service. That is the trick, really.
The SpringContextHelper is a simple class to load up Spring Context and help look up remote services:

public class SpringContextHelper {
    private static final Logger LOG = LoggerFactory.getLogger(SpringContextHelper.class);
    private final BankDetailService bankDetailService;
    private final BankDetailStreamingService bankDetailStreamingService;
    public SpringContextHelper() {
	String policyFilePath = SpringContextHelper.class.getResource("/policy.all").getFile();
	LOG.info("Starting with the policy file {}", policyFilePath);
	System.setProperty("java.security.policy", policyFilePath);
	if (System.getSecurityManager() == null) {
	    System.setSecurityManager(new SecurityManager());
	}
	try (ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(RmiClientConfig.class)) {
	    bankDetailService = context.getBean("bankDetailService", BankDetailService.class);
	    bankDetailStreamingService = context.getBean("bankDetailStreamingService", BankDetailStreamingService.class);
	}
    }
    public BankDetailService getBankDetailService() {
	return bankDetailService;
    }
    public BankDetailStreamingService getBankDetailStreamingService() {
	return bankDetailStreamingService;
    }
}

Running the example

We have embedded the Reggie in, so simply run the SpringNonSecureRmiServer to start the Reggie and the Jini Server. After the Jini Server starts up, run the SimpleStreamingClient. You would see the following on the console:

INFO [(JSK) mux request dispatch] RemoteDataListenerConsolePrinterImpl.newData(14) | New data recieved: BankDetail [id=4519, age=57, job=technician, marital=married, education=secondary, defaulted=no, balance=295, housing=no, loan=no, contact=cellular, day=19, month=aug, duration=151, campaign=11, pdays=-1, previous=0, poutcome=unknown, y=no]
INFO [(JSK) mux request dispatch] RemoteDataListenerConsolePrinterImpl.newData(14) | New data recieved: BankDetail [id=4520, age=28, job=blue-collar, marital=married, education=secondary, defaulted=no, balance=1137, housing=no, loan=no, contact=cellular, day=6, month=feb, duration=129, campaign=4, pdays=211, previous=3, poutcome=other, y=no]
INFO [(JSK) mux request dispatch] RemoteDataListenerConsolePrinterImpl.newData(14) | New data recieved: BankDetail [id=4521, age=44, job=entrepreneur, marital=single, education=tertiary, defaulted=no, balance=1136, housing=yes, loan=yes, contact=cellular, day=3, month=apr, duration=345, campaign=2, pdays=249, previous=7, poutcome=other, y=no]
INFO [(JSK) mux request dispatch] RemoteDataListenerConsolePrinterImpl.endOfData(19) | End of data, exiting...

Sources

The sources can be found here: https://github.com/paawak/blog/tree/master/code/jini/unsecure/streaming-with-jini
There are 3 Maven projects under that:

  1. rm-api
  2. rmi-client
  3. rmi-server