@Override
<<<<<<< HEAD
public StreamDataProvider buildDataProvider(DataSourceFilter dataFilter,
String oldSinceKey) throws Exception{
String host = config.get("host");
String portStr = config.get("port");
int port = Integer.parseInt(portStr);
String topic = config.get("topic");
String timeoutStr = config.get("timeout");
int timeout = timeoutStr != null ? Integer.parseInt(timeoutStr) : 10000;
int batchsize = Integer.parseInt(config.get("batchsize"));
=======
public StreamDataProvider buildDataProvider(final DataSourceFilter dataFilter,
String oldSinceKey,
ShardingStrategy shardingStrategy,
Set partitions) throws Exception
{
String zookeeperUrl = _conf.getString("zookeeperUrl");
String consumerGroupId = _conf.getString("consumerGroupId");
String topic = _conf.getString("topic");
int timeout = _conf.getInt("timeout",10000);
int batchsize = _conf.getInt("batchsize");
>>>>>>> 8f07528e1bcfcb8aab10113409e61bafb6f524e2
long offset = oldSinceKey == null ? 0L : Long.parseLong(oldSinceKey);
KafkaJsonStreamDataProvider provider = new KafkaJsonStreamDataProvider(_versionComparator,zookeeperUrl,timeout,batchsize,consumerGroupId,topic,offset);
if (dataFilter!=null){ |